add function of streaming job and job state management 1.add streaming job function 2.add job state management function 3.update part ut 4.fix few bugs
Author: ahutsunshine <[email protected]> Closes #285 from ahutsunshine/master. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/6be53303 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/6be53303 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/6be53303 Branch: refs/heads/master Commit: 6be53303523ef67513354070b15981ee645941ac Parents: 32f080d Author: ahutsunshine <[email protected]> Authored: Thu May 31 15:12:40 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Thu May 31 15:12:40 2018 +0800 ---------------------------------------------------------------------- .../core/config/EclipseLinkJpaConfig.java | 2 - .../griffin/core/config/PropertiesConfig.java | 15 +- .../core/exception/GriffinExceptionMessage.java | 13 +- .../griffin/core/job/BatchJobOperatorImpl.java | 310 +++++ .../griffin/core/job/FileExistPredicator.java | 8 +- .../apache/griffin/core/job/JobController.java | 24 +- .../apache/griffin/core/job/JobInstance.java | 130 +- .../apache/griffin/core/job/JobOperator.java | 38 + .../org/apache/griffin/core/job/JobService.java | 18 +- .../apache/griffin/core/job/JobServiceImpl.java | 678 +++++----- .../apache/griffin/core/job/SparkSubmitJob.java | 81 +- .../core/job/StreamingJobOperatorImpl.java | 265 ++++ .../griffin/core/job/entity/AbstractJob.java | 52 +- .../griffin/core/job/entity/BatchJob.java | 43 + .../griffin/core/job/entity/GriffinJob.java | 86 -- .../griffin/core/job/entity/JobDataBean.java | 24 +- .../core/job/entity/JobInstanceBean.java | 66 +- .../griffin/core/job/entity/JobSchedule.java | 29 +- .../griffin/core/job/entity/JobState.java | 65 + .../core/job/entity/LivySessionStates.java | 117 +- .../core/job/entity/SegmentPredicate.java | 11 +- .../griffin/core/job/entity/StreamingJob.java | 36 + .../griffin/core/job/entity/VirtualJob.java | 2 + .../griffin/core/job/repo/BatchJobRepo.java | 25 + .../griffin/core/job/repo/GriffinJobRepo.java | 25 - .../griffin/core/job/repo/JobInstanceRepo.java | 10 +- .../griffin/core/job/repo/StreamingJobRepo.java | 6 + .../measure/ExternalMeasureOperationImpl.java | 80 -- .../measure/ExternalMeasureOperatorImpl.java | 81 ++ .../measure/GriffinMeasureOperationImpl.java | 66 - .../measure/GriffinMeasureOperatorImpl.java | 68 + .../griffin/core/measure/MeasureController.java | 14 +- .../griffin/core/measure/MeasureOperation.java | 33 - .../griffin/core/measure/MeasureOperator.java | 34 + .../griffin/core/measure/MeasureService.java | 7 +- .../core/measure/MeasureServiceImpl.java | 34 +- .../core/measure/entity/DataConnector.java | 51 +- .../griffin/core/measure/entity/DataSource.java | 54 +- .../griffin/core/measure/entity/DqType.java | 32 + .../core/measure/entity/EvaluateRule.java | 2 +- .../core/measure/entity/ExternalMeasure.java | 3 + .../core/measure/entity/GriffinMeasure.java | 19 +- .../griffin/core/measure/entity/Measure.java | 15 +- .../griffin/core/measure/entity/Rule.java | 17 +- .../measure/entity/StreamingPreProcess.java | 111 ++ .../core/metastore/hive/HiveMetaStoreProxy.java | 2 - .../hive/HiveMetaStoreServiceImpl.java | 1 - .../griffin/core/metric/model/Metric.java | 11 +- .../org/apache/griffin/core/util/FileUtil.java | 59 + .../org/apache/griffin/core/util/JsonUtil.java | 18 + .../org/apache/griffin/core/util/TimeUtil.java | 2 +- .../apache/griffin/core/util/YarnNetUtil.java | 70 + .../src/main/resources/application.properties | 3 + service/src/main/resources/env/env_batch.json | 59 + .../src/main/resources/env/env_streaming.json | 60 + service/src/main/resources/sparkJob.properties | 2 +- .../griffin/core/job/JobControllerTest.java | 9 +- .../core/job/JobInstanceBeanRepoTest.java | 41 +- .../griffin/core/job/JobInstanceTest.java | 56 +- .../griffin/core/job/JobServiceImplTest.java | 1205 +++++++++--------- .../griffin/core/job/SparkSubmitJobTest.java | 101 +- .../core/job/repo/JobInstanceRepoTest.java | 50 +- .../griffin/core/job/repo/JobRepoTest.java | 6 +- .../ExternalMeasureOperationImplTest.java | 102 -- .../ExternalMeasureOperatorImplTest.java | 102 ++ .../GriffinMeasureOperationImplTest.java | 109 -- .../measure/GriffinMeasureOperatorImplTest.java | 109 ++ .../core/measure/MeasureControllerTest.java | 10 +- .../core/measure/MeasureOrgServiceImplTest.java | 2 +- .../core/measure/MeasureServiceImplTest.java | 33 +- .../hive/HiveMetaStoreServiceImplTest.java | 2 +- .../core/metric/MetricControllerTest.java | 3 +- .../apache/griffin/core/util/EntityHelper.java | 16 +- .../apache/griffin/core/util/JsonUtilTest.java | 5 +- 74 files changed, 3275 insertions(+), 1873 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java b/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java index 29ff5cc..85a5b25 100644 --- a/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java +++ b/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java @@ -52,8 +52,6 @@ public class EclipseLinkJpaConfig extends JpaBaseConfiguration { Map<String, Object> map = new HashMap<>(); map.put(PersistenceUnitProperties.WEAVING, "false"); map.put(PersistenceUnitProperties.DDL_GENERATION, "create-or-extend-tables"); -// map.put("eclipselink.logging.level", "FINEST"); -// map.put("eclipselink.logging.parameters", "true"); return map; } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java index 0f27513..8106ae2 100644 --- a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java +++ b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java @@ -19,6 +19,7 @@ under the License. package org.apache.griffin.core.config; +import org.apache.griffin.core.util.FileUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -26,7 +27,9 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; +import javax.annotation.PostConstruct; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Properties; import static org.apache.griffin.core.util.PropertiesUtil.getConf; @@ -40,10 +43,20 @@ public class PropertiesConfig { private String location; public PropertiesConfig(@Value("${external.config.location}") String location) { - LOGGER.info("external.config.location : {}", location); + LOGGER.info("external.config.location : {}", location != null ? location : "null"); this.location = location; } +// @PostConstruct +// public void init() throws IOException { +// String batchName = "env_batch.json"; +// String batchPath = "env/" + batchName; +// String streamingName = "env_streaming.json"; +// String streamingPath = "env/" + streamingName; +// FileUtil.readBatchEnv(batchPath, batchName); +// FileUtil.readStreamingEnv(streamingPath, streamingName); +// } + @Bean(name = "appConf") public Properties appConf() { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java index 7b9c06c..14d987d 100644 --- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java +++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java @@ -32,13 +32,22 @@ public enum GriffinExceptionMessage { INVALID_METRIC_VALUE_FORMAT(40008, "Metric value format is invalid"), INVALID_MEASURE_ID(40009, "Property 'measure.id' is invalid"), INVALID_CRON_EXPRESSION(40010, "Property 'cron.expression' is invalid"), + MEASURE_TYPE_DOES_NOT_SUPPORT(40011, "We don't support such measure type."), + JOB_TYPE_DOES_NOT_SUPPORT(40011, "We don't support such job type."), + STREAMING_JOB_IS_RUNNING(40012, "There is no need to start again as job is RUNNING."), + STREAMING_JOB_IS_STOPPED(40012, "There is no need to stop again as job is STOPPED."), + JOB_IS_NOT_SCHEDULED(40013, "The job isn't scheduled."), + JOB_IS_NOT_IN_PAUSED_STATUS(40014, "The job isn't in paused status."), + JOB_IS_IN_PAUSED_STATUS(40015, "The job is already in paused status."), //404, "Not Found" MEASURE_ID_DOES_NOT_EXIST(40401, "Measure id does not exist"), JOB_ID_DOES_NOT_EXIST(40402, "Job id does not exist"), JOB_NAME_DOES_NOT_EXIST(40403, "Job name does not exist"), - ORGANIZATION_NAME_DOES_NOT_EXIST(40404, "Organization name does not exist"), - HDFS_FILE_NOT_EXIST(40405, "Hadoop data file not exist"), + NO_SUCH_JOB_ACTION(40404, "No such job action"), + JOB_KEY_DOES_NOT_EXIST(40405, "Job key which consists of group and name does not exist."), + ORGANIZATION_NAME_DOES_NOT_EXIST(40406, "Organization name does not exist"), + HDFS_FILE_NOT_EXIST(40407, "Hadoop data file not exist"), //409, "Conflict" MEASURE_NAME_ALREADY_EXIST(40901, "Measure name already exists"), http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java new file mode 100644 index 0000000..e8d0cd9 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java @@ -0,0 +1,310 @@ +package org.apache.griffin.core.job; + +import org.apache.griffin.core.exception.GriffinException; +import org.apache.griffin.core.job.entity.*; +import org.apache.griffin.core.job.repo.BatchJobRepo; +import org.apache.griffin.core.job.repo.JobInstanceRepo; +import org.apache.griffin.core.measure.entity.DataSource; +import org.apache.griffin.core.measure.entity.GriffinMeasure; +import org.quartz.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.quartz.SchedulerFactoryBean; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; + +import java.util.*; + +import static org.apache.griffin.core.exception.GriffinExceptionMessage.*; +import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH; +import static org.quartz.CronExpression.isValidExpression; +import static org.quartz.JobKey.jobKey; +import static org.quartz.Trigger.TriggerState; +import static org.quartz.Trigger.TriggerState.*; +import static org.quartz.TriggerKey.triggerKey; + +@Service +public class BatchJobOperatorImpl implements JobOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(BatchJobOperatorImpl.class); + + @Autowired + private SchedulerFactoryBean factory; + @Autowired + private JobInstanceRepo instanceRepo; + @Autowired + private BatchJobRepo batchJobRepo; + @Autowired + private JobServiceImpl jobService; + + @Override + @Transactional(rollbackFor = Exception.class) + public JobSchedule add(JobSchedule js, GriffinMeasure measure) throws Exception { + validateParams(js, measure); + String qName = jobService.getQuartzName(js); + String qGroup = jobService.getQuartzGroup(); + TriggerKey triggerKey = jobService.getTriggerKeyIfValid(qName, qGroup); + BatchJob batchJob = new BatchJob(js.getMeasureId(), js.getJobName(), qName, qGroup, false); + batchJob.setJobSchedule(js); + batchJob = batchJobRepo.save(batchJob); + jobService.addJob(triggerKey, js, batchJob, BATCH); + JobSchedule jobSchedule = batchJob.getJobSchedule(); + jobSchedule.setId(batchJob.getId()); + return jobSchedule; + } + + /** + * all states: BLOCKED COMPLETE ERROR NONE NORMAL PAUSED + * to start states: PAUSED + * to stop states: BLOCKED NORMAL + * + * @param job streaming job + */ + @Override + public void start(AbstractJob job) { + String name = job.getName(); + String group = job.getGroup(); + TriggerState state = getTriggerState(name, group); + if (state == null) { + throw new GriffinException.BadRequestException(JOB_IS_NOT_SCHEDULED); + } + /* If job is not in paused state,we can't start it as it may be RUNNING. */ + if (state != PAUSED) { + throw new GriffinException.BadRequestException(JOB_IS_NOT_IN_PAUSED_STATUS); + } + JobKey jobKey = jobKey(name, group); + try { + factory.getScheduler().resumeJob(jobKey); + } catch (SchedulerException e) { + throw new GriffinException.ServiceException("Failed to start job.", e); + } + } + + @Override + public void stop(AbstractJob job) { + pauseJob((BatchJob) job, false); + } + + @Override + @Transactional + public void delete(AbstractJob job) { + pauseJob((BatchJob) job, true); + } + + + @Override + public JobHealth getHealth(JobHealth jobHealth, AbstractJob job) throws SchedulerException { + List<? extends Trigger> triggers = jobService.getTriggers(job.getName(), job.getGroup()); + if (!CollectionUtils.isEmpty(triggers)) { + jobHealth.setJobCount(jobHealth.getJobCount() + 1); + if (jobService.isJobHealthy(job.getId())) { + jobHealth.setHealthyJobCount(jobHealth.getHealthyJobCount() + 1); + } + } + return jobHealth; + } + + @Override + public JobState getState(AbstractJob job, JobDataBean bean, String action) throws SchedulerException { + JobState jobState = new JobState(); + Scheduler scheduler = factory.getScheduler(); + TriggerKey triggerKey = triggerKey(job.getName(), job.getGroup()); + TriggerState triggerState = scheduler.getTriggerState(triggerKey); + jobState.setState(triggerState.toString()); + jobState.setToStart(getStartStatus(triggerState)); + jobState.setToStop(getStopStatus(triggerState)); + return jobState; + } + + /** + * only PAUSED state of job can be started + * @param state job state + * @return true: job can be started, false: job is running which cannot be started + */ + private boolean getStartStatus(TriggerState state) { + return state == PAUSED; + } + + /** + * only NORMAL or BLOCKED state of job can be started + * @param state job state + * @return true: job can be stopped, false: job is running which cannot be stopped + */ + private boolean getStopStatus(TriggerState state) { + return state == NORMAL || state == BLOCKED; + } + + + private TriggerState getTriggerState(String name, String group) { + try { + List<? extends Trigger> triggers = jobService.getTriggers(name, group); + if (CollectionUtils.isEmpty(triggers)) { + return null; + } + TriggerKey key = triggers.get(0).getKey(); + return factory.getScheduler().getTriggerState(key); + } catch (SchedulerException e) { + LOGGER.error("Failed to delete job", e); + throw new GriffinException.ServiceException("Failed to delete job", e); + } + + } + + + /** + * @param job griffin job + * @param delete if job needs to be deleted,set isNeedDelete true,otherwise it just will be paused. + */ + private void pauseJob(BatchJob job, boolean delete) { + try { + pauseJob(job.getGroup(), job.getName()); + pausePredicateJob(job); + job.setDeleted(delete); + batchJobRepo.save(job); + } catch (Exception e) { + LOGGER.error("Job schedule happens exception.", e); + throw new GriffinException.ServiceException("Job schedule happens exception.", e); + } + } + + private void pausePredicateJob(BatchJob job) throws SchedulerException { + List<JobInstanceBean> instances = instanceRepo.findByJobId(job.getId()); + for (JobInstanceBean instance : instances) { + if (!instance.isPredicateDeleted()) { + deleteJob(instance.getPredicateGroup(), instance.getPredicateName()); + instance.setPredicateDeleted(true); + if (instance.getState().equals(LivySessionStates.State.FINDING)) { + instance.setState(LivySessionStates.State.NOT_FOUND); + } + } + } + } + + public void deleteJob(String group, String name) throws SchedulerException { + Scheduler scheduler = factory.getScheduler(); + JobKey jobKey = new JobKey(name, group); + if (!scheduler.checkExists(jobKey)) { + LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName()); + throw new GriffinException.NotFoundException(JOB_KEY_DOES_NOT_EXIST); + } + scheduler.deleteJob(jobKey); + + } + + private void pauseJob(String group, String name) throws SchedulerException { + if (StringUtils.isEmpty(group) || StringUtils.isEmpty(name)) { + return; + } + Scheduler scheduler = factory.getScheduler(); + JobKey jobKey = new JobKey(name, group); + if (!scheduler.checkExists(jobKey)) { + LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName()); + throw new GriffinException.NotFoundException(JOB_KEY_DOES_NOT_EXIST); + } + scheduler.pauseJob(jobKey); + } + + public boolean pauseJobInstances(List<JobInstanceBean> instances) { + if (CollectionUtils.isEmpty(instances)) { + return true; + } + List<JobInstanceBean> deletedInstances = new ArrayList<>(); + boolean pauseStatus = true; + for (JobInstanceBean instance : instances) { + boolean status = pauseJobInstance(instance, deletedInstances); + pauseStatus = pauseStatus && status; + } + instanceRepo.save(deletedInstances); + return pauseStatus; + } + + private boolean pauseJobInstance(JobInstanceBean instance, List<JobInstanceBean> deletedInstances) { + boolean status = true; + String pGroup = instance.getPredicateGroup(); + String pName = instance.getPredicateName(); + try { + if (!instance.isPredicateDeleted()) { + deleteJob(pGroup, pName); + instance.setPredicateDeleted(true); + deletedInstances.add(instance); + } + } catch (SchedulerException e) { + LOGGER.error("Failed to pause predicate job({},{}).", pGroup, pName); + status = false; + } + return status; + } + + private void validateParams(JobSchedule js, GriffinMeasure measure) { + if (!jobService.isValidJobName(js.getJobName())) { + throw new GriffinException.BadRequestException(INVALID_JOB_NAME); + } + if (!isValidCronExpression(js.getCronExpression())) { + throw new GriffinException.BadRequestException(INVALID_CRON_EXPRESSION); + } + if (!isValidBaseLine(js.getSegments())) { + throw new GriffinException.BadRequestException(MISSING_BASELINE_CONFIG); + } + List<String> names = getConnectorNames(measure); + if (!isValidConnectorNames(js.getSegments(), names)) { + throw new GriffinException.BadRequestException(INVALID_CONNECTOR_NAME); + } + } + + private boolean isValidCronExpression(String cronExpression) { + if (StringUtils.isEmpty(cronExpression)) { + LOGGER.warn("Cron Expression is empty."); + return false; + } + if (!isValidExpression(cronExpression)) { + LOGGER.warn("Cron Expression is invalid."); + return false; + } + return true; + } + + private boolean isValidBaseLine(List<JobDataSegment> segments) { + assert segments != null; + for (JobDataSegment jds : segments) { + if (jds.isBaseline()) { + return true; + } + } + LOGGER.warn("Please set segment timestamp baseline in as.baseline field."); + return false; + } + + private boolean isValidConnectorNames(List<JobDataSegment> segments, List<String> names) { + assert segments != null; + Set<String> sets = new HashSet<>(); + for (JobDataSegment segment : segments) { + String dcName = segment.getDataConnectorName(); + sets.add(dcName); + boolean exist = names.stream().anyMatch(name -> name.equals(dcName)); + if (!exist) { + LOGGER.warn("Param {} is a illegal string. Please input one of strings in {}.", dcName, names); + return false; + } + } + if (sets.size() < segments.size()) { + LOGGER.warn("Connector names in job data segment cannot duplicate."); + return false; + } + return true; + } + + private List<String> getConnectorNames(GriffinMeasure measure) { + Set<String> sets = new HashSet<>(); + List<DataSource> sources = measure.getDataSources(); + for (DataSource source : sources) { + source.getConnectors().forEach(dc -> sets.add(dc.getName())); + } + if (sets.size() < sources.size()) { + LOGGER.warn("Connector names cannot be repeated."); + return Collections.emptyList(); + } + return new ArrayList<>(sets); + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java index 7ee8642..19733be 100644 --- a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java +++ b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java @@ -46,12 +46,12 @@ public class FileExistPredicator implements Predicator { @Override public boolean predicate() throws IOException { - Map<String, String> config = predicate.getConfigMap(); + Map<String, Object> config = predicate.getConfigMap(); String[] paths = null; String rootPath = null; - if (config != null && !StringUtils.isEmpty(config.get(PREDICT_PATH))) { - paths = config.get(PREDICT_PATH).split(PATH_CONNECTOR_CHARACTER); - rootPath = config.get(PREDICT_ROOT_PATH); + if (config != null && !StringUtils.isEmpty((String) config.get(PREDICT_PATH))) { + paths = ((String)config.get(PREDICT_PATH)).split(PATH_CONNECTOR_CHARACTER); + rootPath = (String) config.get(PREDICT_ROOT_PATH); } if (ArrayUtils.isEmpty(paths) || StringUtils.isEmpty(rootPath)) { LOGGER.error("Predicate path is null.Please check predicates config root.path and path."); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/JobController.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java index 81b7fb7..5f9b319 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobController.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java @@ -24,6 +24,7 @@ import org.apache.griffin.core.job.entity.JobHealth; import org.apache.griffin.core.job.entity.JobInstanceBean; import org.apache.griffin.core.job.entity.JobSchedule; import org.apache.griffin.core.util.FSUtil; +import org.quartz.SchedulerException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.InputStreamResource; import org.springframework.core.io.Resource; @@ -43,30 +44,41 @@ public class JobController { private JobService jobService; @RequestMapping(value = "/jobs", method = RequestMethod.GET) - public List<JobDataBean> getJobs() { - return jobService.getAliveJobs(); + public List<JobDataBean> getJobs(@RequestParam(value = "type", defaultValue = "") String type) { + return jobService.getAliveJobs(type); } - @RequestMapping(value = "/jobs/config/{jobName}") - public JobSchedule getJobSchedule(@PathVariable("jobName") String jobName) { + @RequestMapping(value = "/jobs/config", method = RequestMethod.GET) + public JobSchedule getJobSchedule(@RequestParam("jobName") String jobName) { return jobService.getJobSchedule(jobName); } + @RequestMapping(value = "/jobs/config/{jobId}") + public JobSchedule getJobSchedule(@PathVariable("jobId") Long jobId) { + return jobService.getJobSchedule(jobId); + } + @RequestMapping(value = "/jobs", method = RequestMethod.POST) @ResponseStatus(HttpStatus.CREATED) public JobSchedule addJob(@RequestBody JobSchedule jobSchedule) throws Exception { return jobService.addJob(jobSchedule); } + @RequestMapping(value = "/jobs/{id}", method = RequestMethod.PUT) + @ResponseStatus(HttpStatus.OK) + public JobDataBean onActions(@PathVariable("id") Long jobId, @RequestParam String action) throws Exception { + return jobService.onAction(jobId,action); + } + @RequestMapping(value = "/jobs", method = RequestMethod.DELETE) @ResponseStatus(HttpStatus.NO_CONTENT) - public void deleteJob(@RequestParam("jobName") String jobName) { + public void deleteJob(@RequestParam("jobName") String jobName) throws SchedulerException { jobService.deleteJob(jobName); } @RequestMapping(value = "/jobs/{id}", method = RequestMethod.DELETE) @ResponseStatus(HttpStatus.NO_CONTENT) - public void deleteJob(@PathVariable("id") Long id) { + public void deleteJob(@PathVariable("id") Long id) throws SchedulerException { jobService.deleteJob(id); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/JobInstance.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java index 00becda..047e581 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java @@ -19,17 +19,16 @@ under the License. package org.apache.griffin.core.job; -import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.commons.lang.StringUtils; +import org.apache.griffin.core.exception.GriffinException; import org.apache.griffin.core.job.entity.*; -import org.apache.griffin.core.job.repo.GriffinJobRepo; import org.apache.griffin.core.job.repo.JobInstanceRepo; -import org.apache.griffin.core.job.repo.JobScheduleRepo; +import org.apache.griffin.core.job.repo.JobRepo; import org.apache.griffin.core.measure.entity.DataConnector; import org.apache.griffin.core.measure.entity.DataSource; import org.apache.griffin.core.measure.entity.GriffinMeasure; +import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType; import org.apache.griffin.core.measure.repo.GriffinMeasureRepo; -import org.apache.griffin.core.util.JsonUtil; import org.apache.griffin.core.util.TimeUtil; import org.quartz.*; import org.slf4j.Logger; @@ -37,12 +36,18 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.quartz.SchedulerFactoryBean; +import org.springframework.transaction.annotation.Transactional; import java.io.IOException; import java.util.*; +import static org.apache.griffin.core.exception.GriffinExceptionMessage.QUARTZ_JOB_ALREADY_EXIST; import static org.apache.griffin.core.job.JobServiceImpl.GRIFFIN_JOB_ID; -import static org.apache.griffin.core.job.JobServiceImpl.JOB_SCHEDULE_ID; +import static org.apache.griffin.core.job.entity.LivySessionStates.State.FINDING; +import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH; +import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.STREAMING; +import static org.apache.griffin.core.util.JsonUtil.toEntity; +import static org.apache.griffin.core.util.JsonUtil.toJson; import static org.quartz.JobBuilder.newJob; import static org.quartz.JobKey.jobKey; import static org.quartz.SimpleScheduleBuilder.simpleSchedule; @@ -64,9 +69,7 @@ public class JobInstance implements Job { @Autowired private GriffinMeasureRepo measureRepo; @Autowired - private GriffinJobRepo jobRepo; - @Autowired - private JobScheduleRepo jobScheduleRepo; + private JobRepo<AbstractJob> jobRepo; @Autowired private JobInstanceRepo instanceRepo; @Autowired @@ -75,12 +78,13 @@ public class JobInstance implements Job { private JobSchedule jobSchedule; private GriffinMeasure measure; - private GriffinJob griffinJob; + private AbstractJob job; private List<SegmentPredicate> mPredicates; private Long jobStartTime; @Override + @Transactional public void execute(JobExecutionContext context) { try { initParam(context); @@ -94,14 +98,12 @@ public class JobInstance implements Job { private void initParam(JobExecutionContext context) throws SchedulerException { mPredicates = new ArrayList<>(); JobDetail jobDetail = context.getJobDetail(); - Long jobScheduleId = jobDetail.getJobDataMap().getLong(JOB_SCHEDULE_ID); - Long griffinJobId = jobDetail.getJobDataMap().getLong(GRIFFIN_JOB_ID); - jobSchedule = jobScheduleRepo.findOne(jobScheduleId); + Long jobId = jobDetail.getJobDataMap().getLong(GRIFFIN_JOB_ID); + job = jobRepo.findOne(jobId); + jobSchedule = job.getJobSchedule(); Long measureId = jobSchedule.getMeasureId(); - griffinJob = jobRepo.findOne(griffinJobId); measure = measureRepo.findOne(measureId); setJobStartTime(jobDetail); - } @SuppressWarnings("unchecked") @@ -180,7 +182,7 @@ public class JobInstance implements Job { * @param dc data connector * @param sampleTs collection of data split start timestamp */ - private void setConnectorPredicates(DataConnector dc, Long[] sampleTs){ + private void setConnectorPredicates(DataConnector dc, Long[] sampleTs) { List<SegmentPredicate> predicates = dc.getPredicates(); for (SegmentPredicate predicate : predicates) { genConfMap(predicate.getConfigMap(), sampleTs, dc.getDataTimeZone()); @@ -202,21 +204,25 @@ public class JobInstance implements Job { * @return all config data combine,like {"where": "year=2017 AND month=11 AND dt=15 AND hour=09,year=2017 AND month=11 AND dt=15 AND hour=10"} * or like {"path": "/year=2017/month=11/dt=15/hour=09/_DONE,/year=2017/month=11/dt=15/hour=10/_DONE"} */ - private void genConfMap(Map<String, String> conf, Long[] sampleTs, String timezone) { + + private void genConfMap(Map<String, Object> conf, Long[] sampleTs, String timezone) { if (conf == null) { LOGGER.warn("Predicate config is null."); return; } - for (Map.Entry<String, String> entry : conf.entrySet()) { - String value = entry.getValue(); - Set<String> set = new HashSet<>(); - if (StringUtils.isEmpty(value)) { - continue; + for (Map.Entry<String, Object> entry : conf.entrySet()) { + // in case entry value is a json object instead of a string + if (entry.getValue() instanceof String) { + String value = (String) entry.getValue(); + Set<String> set = new HashSet<>(); + if (StringUtils.isEmpty(value)) { + continue; + } + for (Long timestamp : sampleTs) { + set.add(TimeUtil.format(value, timestamp, getTimeZone(timezone))); + } + conf.put(entry.getKey(), StringUtils.join(set, PATH_CONNECTOR_CHARACTER)); } - for (Long timestamp : sampleTs) { - set.add(TimeUtil.format(value, timestamp, getTimeZone(timezone))); - } - conf.put(entry.getKey(), StringUtils.join(set, PATH_CONNECTOR_CHARACTER)); } } @@ -228,50 +234,46 @@ public class JobInstance implements Job { } @SuppressWarnings("unchecked") - private boolean createJobInstance(Map<String, Object> confMap) throws Exception { + private void createJobInstance(Map<String, Object> confMap) throws Exception { Map<String, Object> config = (Map<String, Object>) confMap.get("checkdonefile.schedule"); Long interval = TimeUtil.str2Long((String) config.get("interval")); Integer repeat = Integer.valueOf(config.get("repeat").toString()); String groupName = "PG"; - String jobName = griffinJob.getJobName() + "_predicate_" + System.currentTimeMillis(); - Scheduler scheduler = factory.getScheduler(); - TriggerKey triggerKey = triggerKey(jobName, groupName); - return !(scheduler.checkExists(triggerKey) - || !saveJobInstance(jobName, groupName) - || !createJobInstance(triggerKey, interval, repeat, jobName)); + String jobName = job.getJobName() + "_predicate_" + System.currentTimeMillis(); + TriggerKey tk = triggerKey(jobName, groupName); + if (factory.getScheduler().checkExists(tk)) { + throw new GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST); + } + saveJobInstance(jobName, groupName); + createJobInstance(tk, interval, repeat, jobName); } - private boolean saveJobInstance(String pName, String pGroup) { + private void saveJobInstance(String pName, String pGroup) { + ProcessType type = measure.getProcessType() == BATCH ? BATCH : STREAMING; Long tms = System.currentTimeMillis(); Long expireTms = Long.valueOf(appConfProps.getProperty("jobInstance.expired.milliseconds")) + tms; - JobInstanceBean instance = new JobInstanceBean(LivySessionStates.State.finding, pName, pGroup, tms, expireTms); - instance.setGriffinJob(griffinJob); + JobInstanceBean instance = new JobInstanceBean(FINDING, pName, pGroup, tms, expireTms, type); + instance.setJob(job); instanceRepo.save(instance); - return true; } - private boolean createJobInstance(TriggerKey triggerKey, Long interval, Integer repeatCount, String pJobName) throws Exception { - JobDetail jobDetail = addJobDetail(triggerKey, pJobName); - factory.getScheduler().scheduleJob(newTriggerInstance(triggerKey, jobDetail, interval, repeatCount)); - return true; + + private void createJobInstance(TriggerKey tk, Long interval, Integer repeatCount, String pJobName) throws Exception { + JobDetail jobDetail = addJobDetail(tk, pJobName); + Trigger trigger = genTriggerInstance(tk, jobDetail, interval, repeatCount); + factory.getScheduler().scheduleJob(trigger); } - private Trigger newTriggerInstance(TriggerKey triggerKey, JobDetail jd, Long interval, Integer repeatCount) { - return newTrigger() - .withIdentity(triggerKey) - .forJob(jd) - .startNow() - .withSchedule(simpleSchedule() - .withIntervalInMilliseconds(interval) - .withRepeatCount(repeatCount) - ) + private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, Long interval, Integer repeatCount) { + return newTrigger().withIdentity(tk).forJob(jd).startNow() + .withSchedule(simpleSchedule().withIntervalInMilliseconds(interval).withRepeatCount(repeatCount)) .build(); } - private JobDetail addJobDetail(TriggerKey triggerKey, String pJobName) throws SchedulerException, JsonProcessingException { + private JobDetail addJobDetail(TriggerKey tk, String pJobName) throws SchedulerException, IOException { Scheduler scheduler = factory.getScheduler(); - JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup()); + JobKey jobKey = jobKey(tk.getName(), tk.getGroup()); JobDetail jobDetail; Boolean isJobKeyExist = scheduler.checkExists(jobKey); if (isJobKeyExist) { @@ -287,12 +289,30 @@ public class JobInstance implements Job { return jobDetail; } - private void setJobDataMap(JobDetail jobDetail, String pJobName) throws JsonProcessingException { + private void setJobDataMap(JobDetail jobDetail, String pJobName) throws IOException { JobDataMap dataMap = jobDetail.getJobDataMap(); - dataMap.put(MEASURE_KEY, JsonUtil.toJson(measure)); - dataMap.put(PREDICATES_KEY, JsonUtil.toJson(mPredicates)); - dataMap.put(JOB_NAME, griffinJob.getJobName()); + preProcessMeasure(); + String result =toJson(measure); + dataMap.put(MEASURE_KEY, result); + dataMap.put(PREDICATES_KEY, toJson(mPredicates)); + dataMap.put(JOB_NAME, job.getJobName()); dataMap.put(PREDICATE_JOB_NAME, pJobName); } + private void preProcessMeasure() throws IOException { + for (DataSource source : measure.getDataSources()) { + Map cacheMap = source.getCacheMap(); + //to skip batch job + if (cacheMap == null) { + return; + } + String cache = toJson(cacheMap); + cache = cache.replaceAll("\\$\\{JOB_NAME}", job.getJobName()); + cache = cache.replaceAll("\\$\\{SOURCE_NAME}", source.getName()); + cache = cache.replaceAll("\\$\\{TARGET_NAME}", source.getName()); + cacheMap = toEntity(cache,Map.class); + source.setCacheMap(cacheMap); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/JobOperator.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobOperator.java b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java new file mode 100644 index 0000000..40f1c9e --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java @@ -0,0 +1,38 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job; + +import org.apache.griffin.core.job.entity.*; +import org.apache.griffin.core.measure.entity.GriffinMeasure; +import org.quartz.SchedulerException; + +public interface JobOperator { + JobSchedule add(JobSchedule js, GriffinMeasure measure) throws Exception; + + void start(AbstractJob job) throws Exception; + + void stop(AbstractJob job) throws SchedulerException; + + void delete(AbstractJob job) throws SchedulerException; + + JobHealth getHealth(JobHealth jobHealth, AbstractJob job) throws SchedulerException; + + JobState getState(AbstractJob job, JobDataBean bean, String action) throws SchedulerException; +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/JobService.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobService.java b/service/src/main/java/org/apache/griffin/core/job/JobService.java index a238311..a2c4762 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobService.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobService.java @@ -19,27 +19,27 @@ under the License. package org.apache.griffin.core.job; -import org.apache.griffin.core.job.entity.JobDataBean; -import org.apache.griffin.core.job.entity.JobHealth; -import org.apache.griffin.core.job.entity.JobInstanceBean; -import org.apache.griffin.core.job.entity.JobSchedule; +import org.apache.griffin.core.job.entity.*; +import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType; import org.quartz.SchedulerException; import java.util.List; public interface JobService { - List<JobDataBean> getAliveJobs(); + List<JobDataBean> getAliveJobs(String type); JobSchedule getJobSchedule(String jobName); - JobSchedule addJob(JobSchedule jobSchedule) throws Exception; + JobSchedule getJobSchedule(Long jobId); - void pauseJob(String group, String name) throws SchedulerException; + JobSchedule addJob(JobSchedule js) throws Exception; - void deleteJob(Long jobId); + JobDataBean onAction(Long jobId,String action) throws Exception; - void deleteJob(String jobName); + void deleteJob(Long jobId) throws SchedulerException; + + void deleteJob(String jobName) throws SchedulerException; List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int size); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java index f42bc5c..dce6ae4 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java @@ -23,13 +23,15 @@ import com.fasterxml.jackson.core.type.TypeReference; import org.apache.commons.lang.StringUtils; import org.apache.griffin.core.exception.GriffinException; import org.apache.griffin.core.job.entity.*; -import org.apache.griffin.core.job.repo.GriffinJobRepo; +import org.apache.griffin.core.job.repo.BatchJobRepo; import org.apache.griffin.core.job.repo.JobInstanceRepo; -import org.apache.griffin.core.job.repo.JobScheduleRepo; -import org.apache.griffin.core.measure.entity.DataSource; +import org.apache.griffin.core.job.repo.JobRepo; +import org.apache.griffin.core.job.repo.StreamingJobRepo; import org.apache.griffin.core.measure.entity.GriffinMeasure; +import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType; import org.apache.griffin.core.measure.repo.GriffinMeasureRepo; import org.apache.griffin.core.util.JsonUtil; +import org.apache.griffin.core.util.YarnNetUtil; import org.quartz.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,51 +43,57 @@ import org.springframework.data.domain.Sort; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; -import org.springframework.web.client.RestClientException; +import org.springframework.web.client.HttpClientErrorException; +import org.springframework.web.client.ResourceAccessException; import org.springframework.web.client.RestTemplate; -import java.io.IOException; import java.util.*; import static java.util.TimeZone.getTimeZone; import static org.apache.griffin.core.exception.GriffinExceptionMessage.*; -import static org.apache.griffin.core.job.entity.LivySessionStates.State.starting; -import static org.apache.griffin.core.job.entity.LivySessionStates.State.not_started; -import static org.apache.griffin.core.job.entity.LivySessionStates.State.recovering; -import static org.apache.griffin.core.job.entity.LivySessionStates.State.running; -import static org.apache.griffin.core.job.entity.LivySessionStates.State.idle; -import static org.apache.griffin.core.job.entity.LivySessionStates.State.busy; -import static org.quartz.CronExpression.isValidExpression; +import static org.apache.griffin.core.job.entity.LivySessionStates.State; +import static org.apache.griffin.core.job.entity.LivySessionStates.State.*; +import static org.apache.griffin.core.job.entity.LivySessionStates.isActive; +import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH; +import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.STREAMING; import static org.quartz.CronScheduleBuilder.cronSchedule; import static org.quartz.JobBuilder.newJob; import static org.quartz.JobKey.jobKey; +import static org.quartz.SimpleScheduleBuilder.simpleSchedule; import static org.quartz.TriggerBuilder.newTrigger; import static org.quartz.TriggerKey.triggerKey; -import static org.apache.griffin.core.job.entity.LivySessionStates.State; @Service public class JobServiceImpl implements JobService { private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceImpl.class); - public static final String JOB_SCHEDULE_ID = "jobScheduleId"; public static final String GRIFFIN_JOB_ID = "griffinJobId"; private static final int MAX_PAGE_SIZE = 1024; private static final int DEFAULT_PAGE_SIZE = 10; + static final String START = "start"; + static final String STOP = "stop"; + private static final String BATCH_TYPE = "batch"; + private static final String STREAMING_TYPE = "streaming"; @Autowired private SchedulerFactoryBean factory; @Autowired - private JobInstanceRepo jobInstanceRepo; + private JobInstanceRepo instanceRepo; @Autowired @Qualifier("livyConf") private Properties livyConf; @Autowired private GriffinMeasureRepo measureRepo; @Autowired - private GriffinJobRepo jobRepo; + private BatchJobRepo batchJobRepo; + @Autowired + private StreamingJobRepo streamingJobRepo; + @Autowired + private JobRepo<AbstractJob> jobRepo; + @Autowired + private BatchJobOperatorImpl batchJobOp; @Autowired - private JobScheduleRepo jobScheduleRepo; + private StreamingJobOperatorImpl streamingJobOp; private RestTemplate restTemplate; @@ -94,332 +102,342 @@ public class JobServiceImpl implements JobService { } @Override - public List<JobDataBean> getAliveJobs() { + public List<JobDataBean> getAliveJobs(String type) { + List<? extends AbstractJob> jobs; + if (BATCH_TYPE.equals(type)) { + jobs = batchJobRepo.findByDeleted(false); + } else if (STREAMING_TYPE.equals(type)) { + jobs = streamingJobRepo.findByDeleted(false); + } else { + jobs = jobRepo.findByDeleted(false); + } + return getJobDataBeans(jobs); + } + + private List<JobDataBean> getJobDataBeans(List<? extends AbstractJob> jobs) { List<JobDataBean> dataList = new ArrayList<>(); try { - List<GriffinJob> jobs = jobRepo.findByDeleted(false); - for (GriffinJob job : jobs) { - JobDataBean jobData = genJobData(jobKey(job.getQuartzName(), job.getQuartzGroup()), job); + for (AbstractJob job : jobs) { + JobDataBean jobData = genJobDataBean(job); if (jobData != null) { dataList.add(jobData); } } } catch (SchedulerException e) { - LOGGER.error("Failed to get running jobs.", e); - throw new GriffinException.ServiceException("Failed to get running jobs.", e); + LOGGER.error("Failed to get RUNNING jobs.", e); + throw new GriffinException.ServiceException("Failed to get RUNNING jobs.", e); } return dataList; } - @SuppressWarnings("unchecked") - private JobDataBean genJobData(JobKey jobKey, GriffinJob job) throws SchedulerException { - Scheduler scheduler = factory.getScheduler(); - List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey); - if (CollectionUtils.isEmpty(triggers)) { - return null; - } - JobDataBean jobData = new JobDataBean(); - Trigger trigger = triggers.get(0); - setTriggerTime(trigger, jobData); - jobData.setJobId(job.getId()); - jobData.setJobName(job.getJobName()); - jobData.setMeasureId(job.getMeasureId()); - jobData.setTriggerState(scheduler.getTriggerState(trigger.getKey())); - jobData.setCronExpression(getCronExpression(triggers)); - return jobData; - } - - private String getCronExpression(List<Trigger> triggers) { - for (Trigger trigger : triggers) { - if (trigger instanceof CronTrigger) { - return ((CronTrigger) trigger).getCronExpression(); - } - } - return null; - } - - private void setTriggerTime(Trigger trigger, JobDataBean jobBean) { - Date nextFireTime = trigger.getNextFireTime(); - Date previousFireTime = trigger.getPreviousFireTime(); - jobBean.setNextFireTime(nextFireTime != null ? nextFireTime.getTime() : -1); - jobBean.setPreviousFireTime(previousFireTime != null ? previousFireTime.getTime() : -1); - } - @Override public JobSchedule getJobSchedule(String jobName) { - List<GriffinJob> jobs = jobRepo.findByJobNameAndDeleted(jobName, false); + List<AbstractJob> jobs = jobRepo.findByJobNameAndDeleted(jobName, false); if (jobs.size() == 0) { LOGGER.warn("Job name {} does not exist.", jobName); throw new GriffinException.NotFoundException(JOB_NAME_DOES_NOT_EXIST); } - return jobs.get(0).getJobSchedule(); + AbstractJob job = jobs.get(0); + return getJobSchedule(job); } @Override - @Transactional(rollbackFor = Exception.class) - public JobSchedule addJob(JobSchedule js) throws Exception { - Long measureId = js.getMeasureId(); - GriffinMeasure measure = getMeasureIfValid(measureId); - validateJobScheduleParams(js, measure); - String qName = getQuartzName(js); - String qGroup = getQuartzGroup(); - TriggerKey triggerKey = triggerKey(qName, qGroup); - if (factory.getScheduler().checkExists(triggerKey)) { - throw new GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST); + public JobSchedule getJobSchedule(Long jobId) { + AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false); + if (job == null) { + LOGGER.warn("Job id {} does not exist.", jobId); + throw new GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST); } - GriffinJob job = new GriffinJob(measure.getId(), js.getJobName(), qName, qGroup, js,false); - job = jobRepo.save(job); - addJob(triggerKey, js, job); - return job.getJobSchedule(); + return getJobSchedule(job); } - private void addJob(TriggerKey triggerKey, JobSchedule js, GriffinJob job) throws Exception { - JobDetail jobDetail = addJobDetail(triggerKey, js, job); - factory.getScheduler().scheduleJob(genTriggerInstance(triggerKey, jobDetail, js)); + private JobSchedule getJobSchedule(AbstractJob job) { + JobSchedule jobSchedule = job.getJobSchedule(); + jobSchedule.setId(job.getId()); + return jobSchedule; } - private String getQuartzName(JobSchedule js) { - return js.getJobName() + "_" + System.currentTimeMillis(); + @Override + public JobSchedule addJob(JobSchedule js) throws Exception { + Long measureId = js.getMeasureId(); + GriffinMeasure measure = getMeasureIfValid(measureId); + JobOperator op = getJobOperator(measure.getProcessType()); + return op.add(js, measure); } - private String getQuartzGroup() { - return "BA"; + /** + * @param jobId job id + * @param action job operation: start job, stop job + */ + @Override + public JobDataBean onAction(Long jobId, String action) throws Exception { + AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false); + validateJobExist(job); + JobOperator op = getJobOperator(job); + doAction(action, job, op); + return genJobDataBean(job,action); } - private void validateJobScheduleParams(JobSchedule js, GriffinMeasure measure) { - if (!isValidJobName(js.getJobName())) { - throw new GriffinException.BadRequestException(INVALID_JOB_NAME); - } - if (!isValidCronExpression(js.getCronExpression())) { - throw new GriffinException.BadRequestException(INVALID_CRON_EXPRESSION); - } - if (!isValidBaseLine(js.getSegments())) { - throw new GriffinException.BadRequestException(MISSING_BASELINE_CONFIG); - } - List<String> names = getConnectorNames(measure); - if (!isValidConnectorNames(js.getSegments(), names)) { - throw new GriffinException.BadRequestException(INVALID_CONNECTOR_NAME); + private void doAction(String action, AbstractJob job, JobOperator op) throws Exception { + switch (action) { + case START: + op.start(job); + break; + case STOP: + op.stop(job); + break; + default: + throw new GriffinException.NotFoundException(NO_SUCH_JOB_ACTION); } } - private boolean isValidJobName(String jobName) { - if (StringUtils.isEmpty(jobName)) { - LOGGER.warn("Job name cannot be empty."); - return false; + + /** + * logically delete + * 1. pause these jobs + * 2. set these jobs as deleted status + * + * @param jobId griffin job id + */ + @Override + public void deleteJob(Long jobId) throws SchedulerException { + AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false); + validateJobExist(job); + JobOperator op = getJobOperator(job); + op.delete(job); + } + + /** + * logically delete + * + * @param name griffin job name which may not be unique. + */ + @Override + public void deleteJob(String name) throws SchedulerException { + List<AbstractJob> jobs = jobRepo.findByJobNameAndDeleted(name, false); + if (CollectionUtils.isEmpty(jobs)) { + LOGGER.warn("There is no job with '{}' name.", name); + throw new GriffinException.NotFoundException(JOB_NAME_DOES_NOT_EXIST); } - int size = jobRepo.countByJobNameAndDeleted(jobName, false); - if (size > 0) { - LOGGER.warn("Job name already exits."); - return false; + for (AbstractJob job : jobs) { + JobOperator op = getJobOperator(job); + op.delete(job); } - return true; } - private boolean isValidCronExpression(String cronExpression) { - if (StringUtils.isEmpty(cronExpression)) { - LOGGER.warn("Cron Expression is empty."); - return false; - } - if (!isValidExpression(cronExpression)) { - LOGGER.warn("Cron Expression is invalid."); - return false; + @Override + public List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int size) { + AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false); + if (job == null) { + LOGGER.warn("Job id {} does not exist.", jobId); + throw new GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST); } - return true; + size = size > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : size; + size = size <= 0 ? DEFAULT_PAGE_SIZE : size; + Pageable pageable = new PageRequest(page, size, Sort.Direction.DESC, "tms"); + List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, pageable); + return updateState(instances); } - private boolean isValidBaseLine(List<JobDataSegment> segments) { - for (JobDataSegment jds : segments) { - if (jds.isBaseline()) { - return true; + private List<JobInstanceBean> updateState(List<JobInstanceBean> instances) { + for (JobInstanceBean instance : instances) { + State state = instance.getState(); + if (state.equals(UNKNOWN) || isActive(state)) { + syncInstancesOfJob(instance); } } - LOGGER.warn("Please set segment timestamp baseline in as.baseline field."); - return false; + return instances; } - private boolean isValidConnectorNames(List<JobDataSegment> segments, List<String> names) { - Set<String> sets = new HashSet<>(); - for (JobDataSegment segment : segments) { - String dcName = segment.getDataConnectorName(); - sets.add(dcName); - boolean exist = names.stream().anyMatch(name -> name.equals(dcName)); - if (!exist) { - LOGGER.warn("Param {} is a illegal string. Please input one of strings in {}.", dcName, names); - return false; + /** + * a job is regard as healthy job when its latest instance is in healthy state. + * + * @return job healthy statistics + */ + @Override + public JobHealth getHealthInfo() { + JobHealth jobHealth = new JobHealth(); + List<AbstractJob> jobs = jobRepo.findByDeleted(false); + for (AbstractJob job : jobs) { + JobOperator op = getJobOperator(job); + try { + jobHealth = op.getHealth(jobHealth, job); + } catch (SchedulerException e) { + LOGGER.error("Job schedule exception. {}", e.getMessage()); + throw new GriffinException.ServiceException("Fail to Get HealthInfo", e); } + } - if (sets.size() < segments.size()) { - LOGGER.warn("Connector names in job data segment cannot duplicate."); - return false; - } - return true; + return jobHealth; } - private List<String> getConnectorNames(GriffinMeasure measure) { - Set<String> sets = new HashSet<>(); - List<DataSource> sources = measure.getDataSources(); - for (DataSource source : sources) { - source.getConnectors().forEach(dc -> sets.add(dc.getName())); - } - if (sets.size() < sources.size()) { - LOGGER.warn("Connector names cannot be repeated."); - return Collections.emptyList(); + @Scheduled(fixedDelayString = "${jobInstance.expired.milliseconds}") + public void deleteExpiredJobInstance() { + Long timeMills = System.currentTimeMillis(); + List<JobInstanceBean> instances = instanceRepo.findByExpireTmsLessThanEqual(timeMills); + if (!batchJobOp.pauseJobInstances(instances)) { + LOGGER.error("Pause job failure."); + return; } - return new ArrayList<>(sets); + int count = instanceRepo.deleteByExpireTimestamp(timeMills); + LOGGER.info("Delete {} expired job instances.", count); } - private GriffinMeasure getMeasureIfValid(Long measureId) { - GriffinMeasure measure = measureRepo.findByIdAndDeleted(measureId, false); - if (measure == null) { - LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't exist or is external measure type.", measureId); - throw new GriffinException.BadRequestException(INVALID_MEASURE_ID); + private void validateJobExist(AbstractJob job) { + if (job == null) { + LOGGER.warn("Griffin job does not exist."); + throw new GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST); } - return measure; } - private Trigger genTriggerInstance(TriggerKey triggerKey, JobDetail jd, JobSchedule js) { - return newTrigger() - .withIdentity(triggerKey) - .forJob(jd) - .withSchedule(cronSchedule(js.getCronExpression()) - .inTimeZone(getTimeZone(js.getTimeZone())) - ) - .build(); + private JobOperator getJobOperator(AbstractJob job) { + if (job instanceof BatchJob) { + return batchJobOp; + } else if (job instanceof StreamingJob) { + return streamingJobOp; + } + throw new GriffinException.BadRequestException(JOB_TYPE_DOES_NOT_SUPPORT); } - private JobDetail addJobDetail(TriggerKey triggerKey, JobSchedule js, GriffinJob job) throws SchedulerException { - Scheduler scheduler = factory.getScheduler(); - JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup()); - JobDetail jobDetail; - Boolean isJobKeyExist = scheduler.checkExists(jobKey); - if (isJobKeyExist) { - jobDetail = scheduler.getJobDetail(jobKey); - } else { - jobDetail = newJob(JobInstance.class).storeDurably().withIdentity(jobKey).build(); + private JobOperator getJobOperator(ProcessType type) { + if (type == BATCH) { + return batchJobOp; + } else if (type == STREAMING) { + return streamingJobOp; } - setJobDataMap(jobDetail, js, job); - scheduler.addJob(jobDetail, isJobKeyExist); - return jobDetail; + throw new GriffinException.BadRequestException(MEASURE_TYPE_DOES_NOT_SUPPORT); } + TriggerKey getTriggerKeyIfValid(String qName, String qGroup) throws SchedulerException { + TriggerKey triggerKey = triggerKey(qName, qGroup); + if (factory.getScheduler().checkExists(triggerKey)) { + throw new GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST); + } + return triggerKey; + } - private void setJobDataMap(JobDetail jd, JobSchedule js, GriffinJob job) { - JobDataMap jobDataMap = jd.getJobDataMap(); - jobDataMap.put(JOB_SCHEDULE_ID, js.getId().toString()); - jobDataMap.put(GRIFFIN_JOB_ID, job.getId().toString()); + List<? extends Trigger> getTriggers(String name, String group) throws SchedulerException { + JobKey jobKey = new JobKey(name, group); + Scheduler scheduler = factory.getScheduler(); + return scheduler.getTriggersOfJob(jobKey); } - private boolean pauseJob(List<JobInstanceBean> instances) { - if (CollectionUtils.isEmpty(instances)) { - return true; + private JobDataBean genJobDataBean(AbstractJob job, String action) throws SchedulerException { + if (job.getName() == null || job.getGroup() == null) { + return null; } - List<JobInstanceBean> deletedInstances = new ArrayList<>(); - boolean pauseStatus = true; - for (JobInstanceBean instance : instances) { - boolean status = pauseJob(instance, deletedInstances); - pauseStatus = pauseStatus && status; + JobDataBean jobData = new JobDataBean(); + List<? extends Trigger> triggers = getTriggers(job.getName(), job.getGroup()); + /* If triggers are empty, in Griffin it means job is not scheduled or completed whose trigger state is NONE. */ + if (CollectionUtils.isEmpty(triggers) && job instanceof BatchJob) { + return null; } - jobInstanceRepo.save(deletedInstances); - return pauseStatus; + setTriggerTime(triggers, jobData); + JobOperator op = getJobOperator(job); + JobState state = op.getState(job, jobData, action); + jobData.setJobState(state); + jobData.setJobId(job.getId()); + jobData.setJobName(job.getJobName()); + jobData.setMeasureId(job.getMeasureId()); + jobData.setCronExpression(getCronExpression(triggers)); + jobData.setProcessType(job instanceof BatchJob ? BATCH : STREAMING); + return jobData; } - private boolean pauseJob(JobInstanceBean instance, List<JobInstanceBean> deletedInstances) { - boolean status = true; - try { - pauseJob(instance.getPredicateGroup(), instance.getPredicateName()); - instance.setDeleted(true); - deletedInstances.add(instance); - } catch (SchedulerException e) { - LOGGER.error("Failed to pause predicate job({},{}).", instance.getId(), instance.getPredicateName()); - status = false; - } - return status; + private JobDataBean genJobDataBean(AbstractJob job) throws SchedulerException { + return genJobDataBean(job,null); } - @Override - public void pauseJob(String group, String name) throws SchedulerException { - Scheduler scheduler = factory.getScheduler(); - JobKey jobKey = new JobKey(name, group); - if (!scheduler.checkExists(jobKey)) { - LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName()); + private void setTriggerTime(List<? extends Trigger> triggers, JobDataBean jobBean) { + if (CollectionUtils.isEmpty(triggers)) { return; } - scheduler.pauseJob(jobKey); - } - - private void setJobDeleted(GriffinJob job) { - job.setDeleted(true); - jobRepo.save(job); + Trigger trigger = triggers.get(0); + Date nextFireTime = trigger.getNextFireTime(); + Date previousFireTime = trigger.getPreviousFireTime(); + jobBean.setNextFireTime(nextFireTime != null ? nextFireTime.getTime() : -1); + jobBean.setPreviousFireTime(previousFireTime != null ? previousFireTime.getTime() : -1); } - private void deletePredicateJob(GriffinJob job) throws SchedulerException { - List<JobInstanceBean> instances = jobInstanceRepo.findByJobId(job.getId()); - for (JobInstanceBean instance : instances) { - if (!instance.isDeleted()) { - deleteJob(instance.getPredicateGroup(), instance.getPredicateName()); - instance.setDeleted(true); - if (instance.getState().equals(LivySessionStates.State.finding)) { - instance.setState(LivySessionStates.State.not_found); - } + private String getCronExpression(List<? extends Trigger> triggers) { + for (Trigger trigger : triggers) { + if (trigger instanceof CronTrigger) { + return ((CronTrigger) trigger).getCronExpression(); } } + return null; } - /** - * logically delete - * 1. pause these jobs - * 2. set these jobs as deleted status - * - * @param jobId griffin job id - */ - @Override - public void deleteJob(Long jobId) { - GriffinJob job = jobRepo.findByIdAndDeleted(jobId, false); - if (job == null) { - LOGGER.warn("Griffin job does not exist."); - throw new GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST); - } - deleteJob(job); + void addJob(TriggerKey tk, JobSchedule js, AbstractJob job, ProcessType type) throws Exception { + JobDetail jobDetail = addJobDetail(tk, job); + Trigger trigger = genTriggerInstance(tk, jobDetail, js, type); + factory.getScheduler().scheduleJob(trigger); } - /** - * logically delete - * - * @param name griffin job name which may not be unique. - */ - @Override - public void deleteJob(String name) { - List<GriffinJob> jobs = jobRepo.findByJobNameAndDeleted(name, false); - if (CollectionUtils.isEmpty(jobs)) { - LOGGER.warn("There is no job with '{}' name.", name); - throw new GriffinException.NotFoundException(JOB_NAME_DOES_NOT_EXIST); + String getQuartzName(JobSchedule js) { + return js.getJobName() + "_" + System.currentTimeMillis(); + } + + String getQuartzGroup() { + return "BA"; + } + + boolean isValidJobName(String jobName) { + if (StringUtils.isEmpty(jobName)) { + LOGGER.warn("Job name cannot be empty."); + return false; } - for (GriffinJob job : jobs) { - deleteJob(job); + int size = jobRepo.countByJobNameAndDeleted(jobName, false); + if (size > 0) { + LOGGER.warn("Job name already exits."); + return false; } + return true; } - private void deleteJob(GriffinJob job) { - try { - pauseJob(job.getQuartzGroup(), job.getQuartzName()); - deletePredicateJob(job); - setJobDeleted(job); - } catch (Exception e) { - LOGGER.error("Failed to delete job", e); - throw new GriffinException.ServiceException("Failed to delete job", e); + + private GriffinMeasure getMeasureIfValid(Long measureId) { + GriffinMeasure measure = measureRepo.findByIdAndDeleted(measureId, false); + if (measure == null) { + LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't exist or is external measure type.", measureId); + throw new GriffinException.BadRequestException(INVALID_MEASURE_ID); } + return measure; + } + + private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, JobSchedule js, ProcessType type) { + TriggerBuilder builder = newTrigger().withIdentity(tk).forJob(jd); + if (type == BATCH) { + TimeZone timeZone = getTimeZone(js.getTimeZone()); + return builder.withSchedule(cronSchedule(js.getCronExpression()).inTimeZone(timeZone)).build(); + } else if (type == STREAMING) { + return builder.startNow().withSchedule(simpleSchedule().withRepeatCount(0)).build(); + } + throw new GriffinException.BadRequestException(JOB_TYPE_DOES_NOT_SUPPORT); + } - private void deleteJob(String group, String name) throws SchedulerException { + private JobDetail addJobDetail(TriggerKey triggerKey, AbstractJob job) throws SchedulerException { Scheduler scheduler = factory.getScheduler(); - JobKey jobKey = new JobKey(name, group); - if (!scheduler.checkExists(jobKey)) { - LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName()); - return; + JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup()); + JobDetail jobDetail; + Boolean isJobKeyExist = scheduler.checkExists(jobKey); + if (isJobKeyExist) { + jobDetail = scheduler.getJobDetail(jobKey); + } else { + jobDetail = newJob(JobInstance.class).storeDurably().withIdentity(jobKey).build(); } - scheduler.deleteJob(jobKey); + setJobDataMap(jobDetail, job); + scheduler.addJob(jobDetail, isJobKeyExist); + return jobDetail; } + private void setJobDataMap(JobDetail jd, AbstractJob job) { + JobDataMap jobDataMap = jd.getJobDataMap(); + jobDataMap.put(GRIFFIN_JOB_ID, job.getId().toString()); + } + + /** * deleteJobsRelateToMeasure * 1. search jobs related to measure @@ -427,143 +445,91 @@ public class JobServiceImpl implements JobService { * * @param measureId measure id */ - public void deleteJobsRelateToMeasure(Long measureId) { - List<GriffinJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId, false); + public void deleteJobsRelateToMeasure(Long measureId) throws SchedulerException { + List<AbstractJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId, false); if (CollectionUtils.isEmpty(jobs)) { LOGGER.info("Measure id {} has no related jobs.", measureId); return; } - for (GriffinJob job : jobs) { - deleteJob(job); - } - } - - @Override - public List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int size) { - AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false); - if (job == null) { - LOGGER.warn("Job id {} does not exist.", jobId); - throw new GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST); - } - size = size > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : size; - size = size <= 0 ? DEFAULT_PAGE_SIZE : size; - Pageable pageable = new PageRequest(page, size, Sort.Direction.DESC, "tms"); - return jobInstanceRepo.findByJobId(jobId, pageable); - } - - @Scheduled(fixedDelayString = "${jobInstance.expired.milliseconds}") - public void deleteExpiredJobInstance() { - Long timeMills = System.currentTimeMillis(); - List<JobInstanceBean> instances = jobInstanceRepo.findByExpireTmsLessThanEqual(timeMills); - if (!pauseJob(instances)) { - LOGGER.error("Pause job failure."); - return; + for (AbstractJob job : jobs) { + JobOperator op = getJobOperator(job); + op.delete(job); } - jobInstanceRepo.deleteByExpireTimestamp(timeMills); - LOGGER.info("Delete expired job instances success."); } @Scheduled(fixedDelayString = "${jobInstance.fixedDelay.in.milliseconds}") public void syncInstancesOfAllJobs() { - State[] states = {starting, not_started, recovering, idle, running, busy}; - List<JobInstanceBean> beans = jobInstanceRepo.findByActiveState(states); - if (!CollectionUtils.isEmpty(beans)) { - for (JobInstanceBean jobInstance : beans) { - syncInstancesOfJob(jobInstance); - } + LivySessionStates.State[] states = {STARTING, NOT_STARTED, RECOVERING, IDLE, RUNNING, BUSY}; + List<JobInstanceBean> beans = instanceRepo.findByActiveState(states); + for (JobInstanceBean jobInstance : beans) { + syncInstancesOfJob(jobInstance); } } /** * call livy to update part of job instance table data associated with group and jobName in mysql. * - * @param jobInstance job instance livy info + * @param instance job instance livy info */ - private void syncInstancesOfJob(JobInstanceBean jobInstance) { - String uri = livyConf.getProperty("livy.uri") + "/" + jobInstance.getSessionId(); + private void syncInstancesOfJob(JobInstanceBean instance) { + if (instance.getSessionId() == null) { + return; + } + String uri = livyConf.getProperty("livy.uri") + "/" + instance.getSessionId(); TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() { }; try { String resultStr = restTemplate.getForObject(uri, String.class); HashMap<String, Object> resultMap = JsonUtil.toEntity(resultStr, type); - setJobInstanceIdAndUri(jobInstance, resultMap); - } catch (RestClientException e) { - LOGGER.warn("Spark session {} has overdue, set state as unknown!\n {}", jobInstance.getSessionId(), e.getMessage()); - setJobInstanceUnknownStatus(jobInstance); - } catch (IOException e) { - LOGGER.error("Job instance json converts to map failed. {}", e.getMessage()); - } catch (IllegalArgumentException e) { - LOGGER.error("Livy status is illegal. {}", e.getMessage()); + setJobInstanceIdAndUri(instance, resultMap); + } catch (ResourceAccessException e) { + LOGGER.error("Your url may be wrong. Please check {}.\n {}", uri, e.getMessage()); + } catch (HttpClientErrorException e) { + LOGGER.warn("sessionId({}) appId({}) {}.", instance.getSessionId(), instance.getAppId(), e.getMessage()); + setStateByYarn(instance, e); } catch (Exception e) { - LOGGER.error("Sync job instances failure. {}", e.getMessage()); - } - } - - private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String, Object> resultMap) { - if (resultMap != null && resultMap.size() != 0 && resultMap.get("state") != null) { - instance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString())); - if (resultMap.get("appId") != null) { - String appId = String.valueOf(resultMap.get("appId")); - String appUri = livyConf.getProperty("spark.uri") + "/cluster/app/" + appId; - instance.setAppId(appId); - instance.setAppUri(appUri); - } - jobInstanceRepo.save(instance); - + LOGGER.error(e.getMessage()); } } - private void setJobInstanceUnknownStatus(JobInstanceBean jobInstance) { - //if server cannot get session from Livy, set State as unknown. - jobInstance.setState(LivySessionStates.State.unknown); - jobInstanceRepo.save(jobInstance); - } - - /** - * a job is regard as healthy job when its latest instance is in healthy state. - * - * @return job healthy statistics - */ - @Override - public JobHealth getHealthInfo() { - JobHealth jobHealth = new JobHealth(); - List<GriffinJob> jobs = jobRepo.findByDeleted(false); - for (GriffinJob job : jobs) { - jobHealth = getHealthInfo(jobHealth, job); + private void setStateByYarn(JobInstanceBean instance, HttpClientErrorException e) { + int code = e.getStatusCode().value(); + boolean match = (code == 400 || code == 404) && instance.getAppId() != null; + //this means your url is correct,but your param is wrong or livy session may be overdue. + if (match) { + setStateByYarn(instance); } - return jobHealth; } - private JobHealth getHealthInfo(JobHealth jobHealth, GriffinJob job) { - List<Trigger> triggers = getTriggers(job); - if (!CollectionUtils.isEmpty(triggers)) { - jobHealth.setJobCount(jobHealth.getJobCount() + 1); - if (isJobHealthy(job.getId())) { - jobHealth.setHealthyJobCount(jobHealth.getHealthyJobCount() + 1); + private void setStateByYarn(JobInstanceBean instance) { + LOGGER.warn("Spark session {} may be overdue! Now we use yarn to update state.", instance.getSessionId()); + String yarnUrl = livyConf.getProperty("spark.uri"); + boolean success = YarnNetUtil.update(yarnUrl, instance); + if (!success) { + if (instance.getState().equals(UNKNOWN)) { + return; } + instance.setState(UNKNOWN); } - return jobHealth; + instanceRepo.save(instance); } - @SuppressWarnings("unchecked") - private List<Trigger> getTriggers(GriffinJob job) { - JobKey jobKey = new JobKey(job.getQuartzName(), job.getQuartzGroup()); - List<Trigger> triggers; - try { - triggers = (List<Trigger>) factory.getScheduler().getTriggersOfJob(jobKey); - } catch (SchedulerException e) { - LOGGER.error("Job schedule exception. {}", e.getMessage()); - throw new GriffinException.ServiceException("Fail to Get HealthInfo", e); + + private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String, Object> resultMap) { + if (resultMap != null) { + Object state = resultMap.get("state"); + Object appId = resultMap.get("appId"); + instance.setState(state == null ? null : LivySessionStates.State.valueOf(state.toString().toUpperCase())); + instance.setAppId(appId == null ? null : appId.toString()); + instance.setAppUri(appId == null ? null : livyConf.getProperty("spark.uri") + "/cluster/app/" + appId); + instanceRepo.save(instance); } - return triggers; } - private Boolean isJobHealthy(Long jobId) { + public Boolean isJobHealthy(Long jobId) { Pageable pageable = new PageRequest(0, 1, Sort.Direction.DESC, "tms"); - List<JobInstanceBean> instances = jobInstanceRepo.findByJobId(jobId, pageable); + List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, pageable); return !CollectionUtils.isEmpty(instances) && LivySessionStates.isHealthy(instances.get(0).getState()); } - - }
