http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java index b878854..90c18ec 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java @@ -23,34 +23,38 @@ import com.fasterxml.jackson.core.type.TypeReference; import org.apache.commons.lang.StringUtils; import org.apache.griffin.core.error.exception.GriffinException.GetHealthInfoFailureException; import org.apache.griffin.core.error.exception.GriffinException.GetJobsFailureException; -import org.apache.griffin.core.job.entity.JobHealth; -import org.apache.griffin.core.job.entity.JobInstance; -import org.apache.griffin.core.job.entity.JobRequestBody; -import org.apache.griffin.core.job.entity.LivySessionStates; +import org.apache.griffin.core.job.entity.*; +import org.apache.griffin.core.job.repo.GriffinJobRepo; import org.apache.griffin.core.job.repo.JobInstanceRepo; +import org.apache.griffin.core.job.repo.JobScheduleRepo; +import org.apache.griffin.core.measure.entity.DataSource; +import org.apache.griffin.core.measure.entity.GriffinMeasure; import org.apache.griffin.core.measure.entity.Measure; -import org.apache.griffin.core.measure.repo.MeasureRepo; +import org.apache.griffin.core.measure.repo.GriffinMeasureRepo; import org.apache.griffin.core.util.GriffinOperationMessage; import org.apache.griffin.core.util.JsonUtil; import org.quartz.*; -import org.quartz.impl.matchers.GroupMatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.stereotype.Service; +import org.springframework.transaction.interceptor.TransactionAspectSupport; +import org.springframework.util.CollectionUtils; import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; import java.io.IOException; -import java.io.Serializable; +import java.text.ParseException; import java.util.*; -import static org.apache.griffin.core.util.GriffinOperationMessage.*; +import static org.apache.griffin.core.util.GriffinOperationMessage.CREATE_JOB_FAIL; +import static org.apache.griffin.core.util.GriffinOperationMessage.CREATE_JOB_SUCCESS; import static org.quartz.JobBuilder.newJob; import static org.quartz.JobKey.jobKey; import static org.quartz.TriggerBuilder.newTrigger; @@ -59,200 +63,305 @@ import static org.quartz.TriggerKey.triggerKey; @Service public class JobServiceImpl implements JobService { private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceImpl.class); + static final String JOB_SCHEDULE_ID = "jobScheduleId"; + static final String GRIFFIN_JOB_ID = "griffinJobId"; + static final int MAX_PAGE_SIZE = 1024; + static final int DEFAULT_PAGE_SIZE = 10; @Autowired private SchedulerFactoryBean factory; @Autowired private JobInstanceRepo jobInstanceRepo; @Autowired - private Properties sparkJobProps; + @Qualifier("livyConf") + private Properties livyConf; @Autowired - private MeasureRepo measureRepo; + private GriffinMeasureRepo measureRepo; + @Autowired + private GriffinJobRepo jobRepo; + @Autowired + private JobScheduleRepo jobScheduleRepo; private RestTemplate restTemplate; - public JobServiceImpl() { restTemplate = new RestTemplate(); } @Override - public List<Map<String, Serializable>> getAliveJobs() { + public List<JobDataBean> getAliveJobs() { Scheduler scheduler = factory.getObject(); - List<Map<String, Serializable>> list = new ArrayList<>(); + List<JobDataBean> dataList = new ArrayList<>(); try { - for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.anyGroup())) { - Map jobInfoMap = getJobInfoMap(scheduler, jobKey); - if (jobInfoMap.size() != 0 && !isJobDeleted(scheduler, jobKey)) { - list.add(jobInfoMap); + List<GriffinJob> jobs = jobRepo.findByDeleted(false); + for (GriffinJob job : jobs) { + JobDataBean jobData = genJobData(scheduler, jobKey(job.getQuartzName(), job.getQuartzGroup()), job); + if (jobData != null) { + dataList.add(jobData); } } - } catch (SchedulerException e) { - LOGGER.error("failed to get running jobs.{}", e.getMessage()); + } catch (Exception e) { + LOGGER.error("Failed to get running jobs.", e); throw new GetJobsFailureException(); } - return list; + return dataList; } - private boolean isJobDeleted(Scheduler scheduler, JobKey jobKey) throws SchedulerException { - JobDataMap jobDataMap = scheduler.getJobDetail(jobKey).getJobDataMap(); - return jobDataMap.getBooleanFromString("deleted"); - } - - private Map getJobInfoMap(Scheduler scheduler, JobKey jobKey) throws SchedulerException { + private JobDataBean genJobData(Scheduler scheduler, JobKey jobKey, GriffinJob job) throws SchedulerException { List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey); - Map<String, Serializable> jobInfoMap = new HashMap<>(); - if (triggers == null || triggers.size() == 0) { - return jobInfoMap; - } - JobDetail jd = scheduler.getJobDetail(jobKey); - Date nextFireTime = triggers.get(0).getNextFireTime(); - Date previousFireTime = triggers.get(0).getPreviousFireTime(); - Trigger.TriggerState triggerState = scheduler.getTriggerState(triggers.get(0).getKey()); - - jobInfoMap.put("jobName", jobKey.getName()); - jobInfoMap.put("groupName", jobKey.getGroup()); - if (nextFireTime != null) { - jobInfoMap.put("nextFireTime", nextFireTime.getTime()); - } else { - jobInfoMap.put("nextFireTime", -1); - } - if (previousFireTime != null) { - jobInfoMap.put("previousFireTime", previousFireTime.getTime()); - } else { - jobInfoMap.put("previousFireTime", -1); + if (CollectionUtils.isEmpty(triggers)) { + return null; } - jobInfoMap.put("triggerState", triggerState); - jobInfoMap.put("measureId", jd.getJobDataMap().getString("measureId")); - jobInfoMap.put("sourcePattern", jd.getJobDataMap().getString("sourcePattern")); - jobInfoMap.put("targetPattern", jd.getJobDataMap().getString("targetPattern")); - if (StringUtils.isNotEmpty(jd.getJobDataMap().getString("blockStartTimestamp"))) { - jobInfoMap.put("blockStartTimestamp", jd.getJobDataMap().getString("blockStartTimestamp")); + JobDataBean jobData = new JobDataBean(); + Trigger trigger = triggers.get(0); + setTriggerTime(trigger, jobData); + jobData.setJobId(job.getId()); + jobData.setJobName(job.getJobName()); + jobData.setMeasureId(job.getMeasureId()); + jobData.setTriggerState(scheduler.getTriggerState(trigger.getKey())); + jobData.setCronExpression(getCronExpression(triggers)); + return jobData; + } + + private String getCronExpression(List<Trigger> triggers) { + for (Trigger trigger : triggers) { + if (trigger instanceof CronTrigger) { + return ((CronTrigger) trigger).getCronExpression(); + } } - jobInfoMap.put("jobStartTime", jd.getJobDataMap().getString("jobStartTime")); - jobInfoMap.put("interval", jd.getJobDataMap().getString("interval")); - return jobInfoMap; + return null; + } + + private void setTriggerTime(Trigger trigger, JobDataBean jobBean) throws SchedulerException { + Date nextFireTime = trigger.getNextFireTime(); + Date previousFireTime = trigger.getPreviousFireTime(); + jobBean.setNextFireTime(nextFireTime != null ? nextFireTime.getTime() : -1); + jobBean.setPreviousFireTime(previousFireTime != null ? previousFireTime.getTime() : -1); } @Override - public GriffinOperationMessage addJob(String groupName, String jobName, Long measureId, JobRequestBody jobRequestBody) { - int interval; - Date jobStartTime; + public GriffinOperationMessage addJob(JobSchedule js) { + Long measureId = js.getMeasureId(); + GriffinMeasure measure = getMeasureIfValid(measureId); + if (measure != null) { + return addJob(js, measure); + } + return CREATE_JOB_FAIL; + } + + private GriffinOperationMessage addJob(JobSchedule js, GriffinMeasure measure) { + String qName = js.getJobName() + "_" + System.currentTimeMillis(); + String qGroup = getQuartzGroupName(); try { - interval = Integer.parseInt(jobRequestBody.getInterval()); - jobStartTime = new Date(Long.parseLong(jobRequestBody.getJobStartTime())); - setJobStartTime(jobStartTime, interval); - - Scheduler scheduler = factory.getObject(); - TriggerKey triggerKey = triggerKey(jobName, groupName); - if (scheduler.checkExists(triggerKey)) { - LOGGER.error("the triggerKey({},{}) has been used.", jobName, groupName); - return CREATE_JOB_FAIL; + if (addJob(js, measure, qName, qGroup)) { + return CREATE_JOB_SUCCESS; } + } catch (Exception e) { + LOGGER.error("Add job exception happens.", e); + TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); + } + return CREATE_JOB_FAIL; + } + + private boolean addJob(JobSchedule js, GriffinMeasure measure, String qName, String qGroup) throws SchedulerException, ParseException { + Scheduler scheduler = factory.getObject(); + TriggerKey triggerKey = triggerKey(qName, qGroup); + if (!isJobScheduleParamValid(js, measure)) { + return false; + } + if (scheduler.checkExists(triggerKey)) { + return false; + } + GriffinJob job = saveGriffinJob(measure.getId(), js.getJobName(), qName, qGroup); + return job != null && saveAndAddQuartzJob(scheduler, triggerKey, js, job); + } + + private String getQuartzGroupName() { + return "BA"; + } + + private boolean isJobScheduleParamValid(JobSchedule js, GriffinMeasure measure) throws SchedulerException { + if (!isJobNameValid(js.getJobName())) { + return false; + } + if (!isBaseLineValid(js.getSegments())) { + return false; + } + List<String> names = getConnectorNames(measure); + return isConnectorNamesValid(js.getSegments(), names); + } + + private boolean isJobNameValid(String jobName) { + if (StringUtils.isEmpty(jobName)) { + LOGGER.warn("Job name cannot be empty."); + return false; + } + int size = jobRepo.countByJobNameAndDeleted(jobName, false); + if (size > 0) { + LOGGER.warn("Job name already exits."); + return false; + } + return true; + } - if (!isMeasureIdAvailable(measureId)) { - LOGGER.error("The measure id {} does't exist.", measureId); - return CREATE_JOB_FAIL; + private boolean isBaseLineValid(List<JobDataSegment> segments) { + for (JobDataSegment jds : segments) { + if (jds.getBaseline()) { + return true; } + } + LOGGER.warn("Please set segment timestamp baseline in as.baseline field."); + return false; + } - JobDetail jobDetail = addJobDetail(scheduler, groupName, jobName, measureId, jobRequestBody); - scheduler.scheduleJob(newTriggerInstance(triggerKey, jobDetail, interval, jobStartTime)); - return GriffinOperationMessage.CREATE_JOB_SUCCESS; - } catch (NumberFormatException e) { - LOGGER.info("jobStartTime or interval format error! {}", e.getMessage()); - return CREATE_JOB_FAIL; - } catch (SchedulerException e) { - LOGGER.error("SchedulerException when add job. {}", e.getMessage()); - return CREATE_JOB_FAIL; + private boolean isConnectorNamesValid(List<JobDataSegment> segments, List<String> names) { + for (JobDataSegment segment : segments) { + if (!isConnectorNameValid(segment.getDataConnectorName(), names)) { + return false; + } } + return true; } - private Boolean isMeasureIdAvailable(long measureId) { - Measure measure = measureRepo.findOne(measureId); - if (measure != null && !measure.getDeleted()) { - return true; + private boolean isConnectorNameValid(String param, List<String> names) { + for (String name : names) { + if (name.equals(param)) { + return true; + } } + LOGGER.warn("Param {} is a illegal string. Please input one of strings in {}.", param, names); return false; } - private JobDetail addJobDetail(Scheduler scheduler, String groupName, String jobName, Long measureId, JobRequestBody jobRequestBody) throws SchedulerException { - JobKey jobKey = jobKey(jobName, groupName); + private List<String> getConnectorNames(GriffinMeasure measure) { + List<String> names = new ArrayList<>(); + Set<String> sets = new HashSet<>(); + List<DataSource> sources = measure.getDataSources(); + for (DataSource source : sources) { + source.getConnectors().forEach(dc -> { + sets.add(dc.getName()); + }); + } + names.addAll(sets); + if (names.size() < sets.size()) { + LOGGER.error("Connector names cannot be repeated."); + throw new IllegalArgumentException(); + } + return names; + } + + private GriffinMeasure getMeasureIfValid(Long measureId) { + Measure measure = measureRepo.findByIdAndDeleted(measureId, false); + if (measure == null) { + LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't exist or is deleted.", measureId); + return null; + } + return (GriffinMeasure) measure; + } + + private GriffinJob saveGriffinJob(Long measureId, String jobName, String qName, String qGroup) { + GriffinJob job = new GriffinJob(measureId, jobName, qName, qGroup, false); + return jobRepo.save(job); + } + + private boolean saveAndAddQuartzJob(Scheduler scheduler, TriggerKey triggerKey, JobSchedule js, GriffinJob job) throws SchedulerException, ParseException { + js = jobScheduleRepo.save(js); + JobDetail jobDetail = addJobDetail(scheduler, triggerKey, js, job); + scheduler.scheduleJob(genTriggerInstance(triggerKey, jobDetail, js)); + return true; + } + + + private Trigger genTriggerInstance(TriggerKey triggerKey, JobDetail jd, JobSchedule js) throws ParseException { + return newTrigger() + .withIdentity(triggerKey) + .forJob(jd) + .withSchedule(CronScheduleBuilder.cronSchedule(new CronExpression(js.getCronExpression())) + .inTimeZone(TimeZone.getTimeZone(js.getTimeZone())) + ) + .build(); + } + + private JobDetail addJobDetail(Scheduler scheduler, TriggerKey triggerKey, JobSchedule js, GriffinJob job) throws SchedulerException { + JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup()); JobDetail jobDetail; - if (scheduler.checkExists(jobKey)) { + Boolean isJobKeyExist = scheduler.checkExists(jobKey); + if (isJobKeyExist) { jobDetail = scheduler.getJobDetail(jobKey); - setJobData(jobDetail, jobRequestBody, measureId, groupName, jobName); - scheduler.addJob(jobDetail, true); } else { - jobDetail = newJob(SparkSubmitJob.class) - .storeDurably() - .withIdentity(jobKey) - .build(); - //set JobData - setJobData(jobDetail, jobRequestBody, measureId, groupName, jobName); - scheduler.addJob(jobDetail, false); + jobDetail = newJob(JobInstance.class).storeDurably().withIdentity(jobKey).build(); } + setJobDataMap(jobDetail, js, job); + scheduler.addJob(jobDetail, isJobKeyExist); return jobDetail; } - private Trigger newTriggerInstance(TriggerKey triggerKey, JobDetail jobDetail, int interval, Date jobStartTime) throws SchedulerException { - Trigger trigger = newTrigger() - .withIdentity(triggerKey) - .forJob(jobDetail) - .withSchedule(SimpleScheduleBuilder.simpleSchedule() - .withIntervalInSeconds(interval) - .repeatForever()) - .startAt(jobStartTime) - .build(); - return trigger; + + private void setJobDataMap(JobDetail jd, JobSchedule js, GriffinJob job) { + jd.getJobDataMap().put(JOB_SCHEDULE_ID, js.getId().toString()); + jd.getJobDataMap().put(GRIFFIN_JOB_ID, job.getId().toString()); } - private void setJobStartTime(Date jobStartTime, int interval) { - long currentTimestamp = System.currentTimeMillis(); - long jobStartTimestamp = jobStartTime.getTime(); - //if jobStartTime is before currentTimestamp, reset it with a future time - if (jobStartTime.before(new Date(currentTimestamp))) { - long n = (currentTimestamp - jobStartTimestamp) / (long) (interval * 1000); - jobStartTimestamp = jobStartTimestamp + (n + 1) * (long) (interval * 1000); - jobStartTime.setTime(jobStartTimestamp); + private boolean pauseJob(List<JobInstanceBean> instances) { + if (CollectionUtils.isEmpty(instances)) { + return true; + } + List<JobInstanceBean> deletedInstances = new ArrayList<>(); + boolean pauseStatus = true; + for (JobInstanceBean instance : instances) { + boolean status = pauseJob(instance, deletedInstances); + pauseStatus = pauseStatus && status; } + jobInstanceRepo.save(deletedInstances); + return pauseStatus; } - private void setJobData(JobDetail jobDetail, JobRequestBody jobRequestBody, Long measureId, String groupName, String jobName) { - jobDetail.getJobDataMap().put("groupName", groupName); - jobDetail.getJobDataMap().put("jobName", jobName); - jobDetail.getJobDataMap().put("measureId", measureId.toString()); - jobDetail.getJobDataMap().put("sourcePattern", jobRequestBody.getSourcePattern()); - jobDetail.getJobDataMap().put("targetPattern", jobRequestBody.getTargetPattern()); - jobDetail.getJobDataMap().put("blockStartTimestamp", jobRequestBody.getBlockStartTimestamp()); - jobDetail.getJobDataMap().put("jobStartTime", jobRequestBody.getJobStartTime()); - jobDetail.getJobDataMap().put("interval", jobRequestBody.getInterval()); - jobDetail.getJobDataMap().put("lastBlockStartTimestamp", ""); - jobDetail.getJobDataMap().putAsString("deleted", false); + private boolean pauseJob(JobInstanceBean instance, List<JobInstanceBean> deletedInstances) { + boolean status; + try { + status = pauseJob(instance.getPredicateGroup(), instance.getPredicateName()); + if (status) { + instance.setDeleted(true); + deletedInstances.add(instance); + } + } catch (SchedulerException e) { + LOGGER.error("Pause predicate job({},{}) failure.", instance.getId(), instance.getPredicateName()); + status = false; + } + return status; } @Override - public GriffinOperationMessage pauseJob(String group, String name) { - try { - Scheduler scheduler = factory.getObject(); - scheduler.pauseJob(new JobKey(name, group)); - return GriffinOperationMessage.PAUSE_JOB_SUCCESS; - } catch (SchedulerException | NullPointerException e) { - LOGGER.error("{} {}", GriffinOperationMessage.PAUSE_JOB_FAIL, e.getMessage()); - return GriffinOperationMessage.PAUSE_JOB_FAIL; + public boolean pauseJob(String group, String name) throws SchedulerException { + Scheduler scheduler = factory.getObject(); + JobKey jobKey = new JobKey(name, group); + if (!scheduler.checkExists(jobKey)) { + LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName()); + return false; } + scheduler.pauseJob(jobKey); + return true; } - private GriffinOperationMessage setJobDeleted(String group, String name) { - try { - Scheduler scheduler = factory.getObject(); - JobDetail jobDetail = scheduler.getJobDetail(new JobKey(name, group)); - jobDetail.getJobDataMap().putAsString("deleted", true); - scheduler.addJob(jobDetail, true); - return GriffinOperationMessage.SET_JOB_DELETED_STATUS_SUCCESS; - } catch (SchedulerException | NullPointerException e) { - LOGGER.error("{} {}", GriffinOperationMessage.PAUSE_JOB_FAIL, e.getMessage()); - return GriffinOperationMessage.SET_JOB_DELETED_STATUS_FAIL; + private boolean setJobDeleted(GriffinJob job) throws SchedulerException { + job.setDeleted(true); + jobRepo.save(job); + return true; + } + + private boolean deletePredicateJob(GriffinJob job) throws SchedulerException { + boolean pauseStatus = true; + List<JobInstanceBean> instances = job.getJobInstances(); + for (JobInstanceBean instance : instances) { + if (!instance.getDeleted()) { + pauseStatus = pauseStatus && deleteJob(instance.getPredicateGroup(), instance.getPredicateName()); + instance.setDeleted(true); + if (instance.getState().equals(LivySessionStates.State.finding)) { + instance.setState(LivySessionStates.State.not_found); + } + } } + return pauseStatus; } /** @@ -260,18 +369,60 @@ public class JobServiceImpl implements JobService { * 1. pause these jobs * 2. set these jobs as deleted status * - * @param group job group name - * @param name job name + * @param jobId griffin job id + * @return custom information + */ + @Override + public GriffinOperationMessage deleteJob(Long jobId) { + GriffinJob job = jobRepo.findByIdAndDeleted(jobId, false); + return deleteJob(job) ? GriffinOperationMessage.DELETE_JOB_SUCCESS : GriffinOperationMessage.DELETE_JOB_FAIL; + } + + /** + * logically delete + * + * @param name griffin job name which may not be unique. * @return custom information */ @Override - public GriffinOperationMessage deleteJob(String group, String name) { - //logically delete - if (pauseJob(group, name).equals(PAUSE_JOB_SUCCESS) && - setJobDeleted(group, name).equals(SET_JOB_DELETED_STATUS_SUCCESS)) { - return GriffinOperationMessage.DELETE_JOB_SUCCESS; + public GriffinOperationMessage deleteJob(String name) { + List<GriffinJob> jobs = jobRepo.findByJobNameAndDeleted(name, false); + if (CollectionUtils.isEmpty(jobs)) { + LOGGER.warn("There is no job with '{}' name.", name); + return GriffinOperationMessage.DELETE_JOB_FAIL; } - return GriffinOperationMessage.DELETE_JOB_FAIL; + for (GriffinJob job : jobs) { + if (!deleteJob(job)) { + return GriffinOperationMessage.DELETE_JOB_FAIL; + } + } + return GriffinOperationMessage.DELETE_JOB_SUCCESS; + } + + private boolean deleteJob(GriffinJob job) { + if (job == null) { + LOGGER.warn("Griffin job does not exist."); + return false; + } + try { + if (pauseJob(job.getQuartzGroup(), job.getQuartzName()) && deletePredicateJob(job) && setJobDeleted(job)) { + return true; + } + } catch (Exception e) { + LOGGER.error("Delete job failure.", e); + } + return false; + } + + private boolean deleteJob(String group, String name) throws SchedulerException { + Scheduler scheduler = factory.getObject(); + JobKey jobKey = new JobKey(name, group); + if (scheduler.checkExists(jobKey)) { + LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName()); + return false; + } + scheduler.deleteJob(jobKey); + return true; } /** @@ -279,74 +430,62 @@ public class JobServiceImpl implements JobService { * 1. search jobs related to measure * 2. deleteJob * - * @param measure measure data quality between source and target dataset - * @throws SchedulerException quartz throws if schedule has problem + * @param measureId measure id */ - public void deleteJobsRelateToMeasure(Measure measure) throws SchedulerException { - Scheduler scheduler = factory.getObject(); - //get all jobs - for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.anyGroup())) { - JobDetail jobDetail = scheduler.getJobDetail(jobKey); - JobDataMap jobDataMap = jobDetail.getJobDataMap(); - if (jobDataMap.getString("measureId").equals(measure.getId().toString())) { - //select jobs related to measureId - deleteJob(jobKey.getGroup(), jobKey.getName()); - LOGGER.info("{} {} is paused and logically deleted.", jobKey.getGroup(), jobKey.getName()); - } + public boolean deleteJobsRelateToMeasure(Long measureId) { + List<GriffinJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId, false); + if (CollectionUtils.isEmpty(jobs)) { + LOGGER.warn("Measure id {} has no related jobs.", measureId); + return false; + } + for (GriffinJob job : jobs) { + deleteJob(job); } + return true; } @Override - public List<JobInstance> findInstancesOfJob(String group, String jobName, int page, int size) { - try { - Scheduler scheduler = factory.getObject(); - JobKey jobKey = new JobKey(jobName, group); - if (!scheduler.checkExists(jobKey) || isJobDeleted(scheduler, jobKey)) { - return new ArrayList<>(); - } - } catch (SchedulerException e) { - LOGGER.error("Quartz schedule error. {}", e.getMessage()); + public List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int size) { + AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false); + if (job == null) { + LOGGER.warn("Job id {} does not exist.", jobId); return new ArrayList<>(); } - //query and return instances - Pageable pageRequest = new PageRequest(page, size, Sort.Direction.DESC, "timestamp"); - return jobInstanceRepo.findByGroupNameAndJobName(group, jobName, pageRequest); + size = size > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : size; + size = size <= 0 ? DEFAULT_PAGE_SIZE : size; + Pageable pageable = new PageRequest(page, size, Sort.Direction.DESC, "tms"); + return jobInstanceRepo.findByJobId(jobId, pageable); + } + + @Scheduled(fixedDelayString = "${jobInstance.expired.milliseconds}") + public void deleteExpiredJobInstance() { + List<JobInstanceBean> instances = jobInstanceRepo.findByExpireTmsLessThanEqual(System.currentTimeMillis()); + if (!pauseJob(instances)) { + LOGGER.error("Pause job failure."); + return; + } + jobInstanceRepo.deleteByExpireTimestamp(System.currentTimeMillis()); + LOGGER.info("Delete expired job instances success."); } @Scheduled(fixedDelayString = "${jobInstance.fixedDelay.in.milliseconds}") public void syncInstancesOfAllJobs() { - List<Object> groupJobList = jobInstanceRepo.findGroupWithJobName(); - for (Object groupJobObj : groupJobList) { - try { - Object[] groupJob = (Object[]) groupJobObj; - if (groupJob != null && groupJob.length == 2) { - syncInstancesOfJob(groupJob[0].toString(), groupJob[1].toString()); - } - } catch (Exception e) { - LOGGER.error("schedule update instances of all jobs failed. {}", e.getMessage()); + List<JobInstanceBean> beans = jobInstanceRepo.findByActiveState(); + if (!CollectionUtils.isEmpty(beans)) { + for (JobInstanceBean jobInstance : beans) { + syncInstancesOfJob(jobInstance); } } } + /** - * call livy to update part of jobInstance table data associated with group and jobName in mysql. + * call livy to update part of job instance table data associated with group and jobName in mysql. * - * @param group group name of jobInstance - * @param jobName job name of jobInstance + * @param jobInstance job instance livy info */ - private void syncInstancesOfJob(String group, String jobName) { - //update all instance info belongs to this group and job. - List<JobInstance> jobInstanceList = jobInstanceRepo.findByGroupNameAndJobName(group, jobName); - for (JobInstance jobInstance : jobInstanceList) { - if (LivySessionStates.isActive(jobInstance.getState())) { - String uri = sparkJobProps.getProperty("livy.uri") + "/" + jobInstance.getSessionId(); - setJobInstanceInfo(jobInstance, uri, group, jobName); - } - - } - } - - private void setJobInstanceInfo(JobInstance jobInstance, String uri, String group, String jobName) { + private void syncInstancesOfJob(JobInstanceBean jobInstance) { + String uri = livyConf.getProperty("livy.uri") + "/" + jobInstance.getSessionId(); TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() { }; try { @@ -354,28 +493,31 @@ public class JobServiceImpl implements JobService { HashMap<String, Object> resultMap = JsonUtil.toEntity(resultStr, type); setJobInstanceIdAndUri(jobInstance, resultMap); } catch (RestClientException e) { - LOGGER.error("spark session {} has overdue, set state as unknown!\n {}", jobInstance.getSessionId(), e.getMessage()); + LOGGER.error("Spark session {} has overdue, set state as unknown!\n {}", jobInstance.getSessionId(), e.getMessage()); setJobInstanceUnknownStatus(jobInstance); } catch (IOException e) { - LOGGER.error("jobInstance jsonStr convert to map failed. {}", e.getMessage()); + LOGGER.error("Job instance json converts to map failed. {}", e.getMessage()); } catch (IllegalArgumentException e) { - LOGGER.warn("Livy status is illegal. {}", group, jobName, e.getMessage()); + LOGGER.error("Livy status is illegal. {}", e.getMessage()); } } - private void setJobInstanceIdAndUri(JobInstance jobInstance, HashMap<String, Object> resultMap) throws IllegalArgumentException { + + private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String, Object> resultMap) { if (resultMap != null && resultMap.size() != 0 && resultMap.get("state") != null) { - jobInstance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString())); + instance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString())); if (resultMap.get("appId") != null) { - jobInstance.setAppId(resultMap.get("appId").toString()); - jobInstance.setAppUri(sparkJobProps.getProperty("spark.uri") + "/cluster/app/" + resultMap.get("appId").toString()); + String appId = String.valueOf(resultMap.get("appId")); + String appUri = livyConf.getProperty("spark.uri") + "/cluster/app/" + appId; + instance.setAppId(appId); + instance.setAppUri(appUri); } - jobInstanceRepo.save(jobInstance); + jobInstanceRepo.save(instance); } } - private void setJobInstanceUnknownStatus(JobInstance jobInstance) { + private void setJobInstanceUnknownStatus(JobInstanceBean jobInstance) { //if server cannot get session from Livy, set State as unknown. jobInstance.setState(LivySessionStates.State.unknown); jobInstanceRepo.save(jobInstance); @@ -388,55 +530,42 @@ public class JobServiceImpl implements JobService { */ @Override public JobHealth getHealthInfo() { - Scheduler scheduler = factory.getObject(); - int jobCount = 0; - int notHealthyCount = 0; - try { - Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.anyGroup()); - for (JobKey jobKey : jobKeys) { - List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey); - if (triggers != null && triggers.size() != 0 && !isJobDeleted(scheduler, jobKey)) { - jobCount++; - notHealthyCount = getJobNotHealthyCount(notHealthyCount, jobKey); - } - } - } catch (SchedulerException e) { - LOGGER.error(e.getMessage()); - throw new GetHealthInfoFailureException(); + JobHealth jobHealth = new JobHealth(); + List<GriffinJob> jobs = jobRepo.findByDeleted(false); + for (GriffinJob job : jobs) { + jobHealth = getHealthInfo(jobHealth, job); } - return new JobHealth(jobCount - notHealthyCount, jobCount); + return jobHealth; } - private int getJobNotHealthyCount(int notHealthyCount, JobKey jobKey) { - if (!isJobHealthy(jobKey)) { - notHealthyCount++; + private JobHealth getHealthInfo(JobHealth jobHealth, GriffinJob job) { + List<Trigger> triggers = getTriggers(job); + if (!CollectionUtils.isEmpty(triggers)) { + jobHealth.setJobCount(jobHealth.getJobCount() + 1); + if (isJobHealthy(job.getId())) { + jobHealth.setHealthyJobCount(jobHealth.getHealthyJobCount() + 1); + } } - return notHealthyCount; + return jobHealth; } - private Boolean isJobHealthy(JobKey jobKey) { - Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, "timestamp"); - JobInstance latestJobInstance; - List<JobInstance> jobInstances = jobInstanceRepo.findByGroupNameAndJobName(jobKey.getGroup(), jobKey.getName(), pageRequest); - if (jobInstances != null && jobInstances.size() > 0) { - latestJobInstance = jobInstances.get(0); - if (LivySessionStates.isHealthy(latestJobInstance.getState())) { - return true; - } + private List<Trigger> getTriggers(GriffinJob job) { + JobKey jobKey = new JobKey(job.getQuartzName(), job.getQuartzGroup()); + List<Trigger> triggers; + try { + triggers = (List<Trigger>) factory.getObject().getTriggersOfJob(jobKey); + } catch (SchedulerException e) { + LOGGER.error("Job schedule exception. {}", e.getMessage()); + throw new GetHealthInfoFailureException(); } - return false; + return triggers; } - @Override - public Map<String, List<Map<String, Serializable>>> getJobDetailsGroupByMeasureId() { - Map<String, List<Map<String, Serializable>>> jobDetailsMap = new HashMap<>(); - List<Map<String, Serializable>> jobInfoList = getAliveJobs(); - for (Map<String, Serializable> jobInfo : jobInfoList) { - String measureId = (String) jobInfo.get("measureId"); - List<Map<String, Serializable>> jobs = jobDetailsMap.getOrDefault(measureId, new ArrayList<>()); - jobs.add(jobInfo); - jobDetailsMap.put(measureId, jobs); - } - return jobDetailsMap; + private Boolean isJobHealthy(Long jobId) { + Pageable pageable = new PageRequest(0, 1, Sort.Direction.DESC, "tms"); + List<JobInstanceBean> instances = jobInstanceRepo.findByJobId(jobId,pageable); + return !CollectionUtils.isEmpty(instances) && LivySessionStates.isHealthy(instances.get(0).getState()); } + + }
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/Predicator.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/Predicator.java b/service/src/main/java/org/apache/griffin/core/job/Predicator.java new file mode 100644 index 0000000..dd9e105 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/Predicator.java @@ -0,0 +1,26 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job; + +import java.io.IOException; + +public interface Predicator { + boolean predicate() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java index a1e1e9d..e089d15 100644 --- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java +++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java @@ -21,205 +21,128 @@ package org.apache.griffin.core.job; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; -import org.apache.commons.lang.StringUtils; -import org.apache.griffin.core.job.entity.JobInstance; +import org.apache.griffin.core.job.entity.JobInstanceBean; +import org.apache.griffin.core.job.entity.LivyConf; import org.apache.griffin.core.job.entity.LivySessionStates; -import org.apache.griffin.core.job.entity.SparkJobDO; +import org.apache.griffin.core.job.entity.SegmentPredicate; +import org.apache.griffin.core.job.factory.PredicatorFactory; import org.apache.griffin.core.job.repo.JobInstanceRepo; -import org.apache.griffin.core.measure.entity.DataConnector; -import org.apache.griffin.core.measure.entity.DataSource; -import org.apache.griffin.core.measure.entity.Measure; -import org.apache.griffin.core.measure.repo.MeasureRepo; +import org.apache.griffin.core.measure.entity.GriffinMeasure; import org.apache.griffin.core.util.JsonUtil; import org.quartz.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.util.CollectionUtils; import org.springframework.web.client.RestTemplate; import java.io.IOException; -import java.text.SimpleDateFormat; import java.util.*; +import static org.apache.griffin.core.job.JobInstance.*; + @PersistJobDataAfterExecution @DisallowConcurrentExecution public class SparkSubmitJob implements Job { private static final Logger LOGGER = LoggerFactory.getLogger(SparkSubmitJob.class); + private static final String SPARK_JOB_JARS_SPLIT = ";"; @Autowired - private MeasureRepo measureRepo; - @Autowired private JobInstanceRepo jobInstanceRepo; @Autowired - private Properties sparkJobProps; - - /** - * partitionItems - * for example - * partitionItems like "date","hour",... - */ - private String[] partitionItems; - /** - * sourcePatternItems targetPatternItems - * for example - * sourcePatternItems or targetPatternItems is like "YYYYMMDD","HH",... - */ - private String[] sourcePatternItems, targetPatternItems; + @Qualifier("livyConf") + private Properties livyConfProps; + @Autowired + private JobServiceImpl jobService; - private Measure measure; - private String sourcePattern, targetPattern; - private String blockStartTimestamp, lastBlockStartTimestamp; - private String interval; - private String uri; + private GriffinMeasure measure; + private String livyUri; + private List<SegmentPredicate> mPredicts; + private JobInstanceBean jobInstance; private RestTemplate restTemplate = new RestTemplate(); - private SparkJobDO sparkJobDO = new SparkJobDO(); + private LivyConf livyConf = new LivyConf(); - public SparkSubmitJob() { - } - - /** - * execute method is used to submit sparkJobDO to Livy. - * - * @param context Job execution context - */ @Override public void execute(JobExecutionContext context) { JobDetail jd = context.getJobDetail(); - String groupName = jd.getJobDataMap().getString("groupName"); - String jobName = jd.getJobDataMap().getString("jobName"); - initParam(jd); - //prepare current system timestamp - long currentBlockStartTimestamp = setCurrentBlockStartTimestamp(System.currentTimeMillis()); - LOGGER.info("currentBlockStartTimestamp: {}", currentBlockStartTimestamp); try { - if (StringUtils.isNotEmpty(sourcePattern)) { - setAllDataConnectorPartitions(measure.getDataSources(), sourcePattern.split("-"), partitionItems, "source", currentBlockStartTimestamp); - } - if (StringUtils.isNotEmpty(targetPattern)) { - setAllDataConnectorPartitions(measure.getDataSources(), targetPattern.split("-"), partitionItems, "target", currentBlockStartTimestamp); + initParam(jd); + setLivyConf(); + if (!success(mPredicts)) { + updateJobInstanceState(context); + return; } + saveJobInstance(jd); } catch (Exception e) { - LOGGER.error("Can not execute job.Set partitions error. {}", e.getMessage()); - return; + LOGGER.error("Post spark task error.", e); } - jd.getJobDataMap().put("lastBlockStartTimestamp", currentBlockStartTimestamp + ""); - setSparkJobDO(); - String result; - try { - result = restTemplate.postForObject(uri, sparkJobDO, String.class); - } catch (Exception e) { - LOGGER.error("Post spark task error. {}", e.getMessage()); - return; - } - LOGGER.info(result); - saveJobInstance(groupName, jobName, result); } - private void initParam(JobDetail jd) { - /** - * the field measureId is generated from `setJobData` in `JobServiceImpl` - */ - String measureId = jd.getJobDataMap().getString("measureId"); - measure = measureRepo.findOne(Long.valueOf(measureId)); - if (measure == null) { - LOGGER.error("Measure with id {} is not find!", measureId); - return; + private void updateJobInstanceState(JobExecutionContext context) throws IOException { + SimpleTrigger simpleTrigger = (SimpleTrigger) context.getTrigger(); + int repeatCount = simpleTrigger.getRepeatCount(); + int fireCount = simpleTrigger.getTimesTriggered(); + if (fireCount > repeatCount) { + saveJobInstance(null, LivySessionStates.State.not_found, true); } - setMeasureInstanceName(measure, jd); - partitionItems = sparkJobProps.getProperty("sparkJob.dateAndHour").split(","); - uri = sparkJobProps.getProperty("livy.uri"); - sourcePattern = jd.getJobDataMap().getString("sourcePattern"); - targetPattern = jd.getJobDataMap().getString("targetPattern"); - blockStartTimestamp = jd.getJobDataMap().getString("blockStartTimestamp"); - lastBlockStartTimestamp = jd.getJobDataMap().getString("lastBlockStartTimestamp"); - LOGGER.info("lastBlockStartTimestamp:{}", lastBlockStartTimestamp); - interval = jd.getJobDataMap().getString("interval"); - } - - private void setMeasureInstanceName(Measure measure, JobDetail jd) { - // in order to keep metric name unique, we set measure name as jobName at present - measure.setName(jd.getJobDataMap().getString("jobName")); } - private void setAllDataConnectorPartitions(List<DataSource> sources, String[] patternItemSet, String[] partitionItems, String sourceName, long timestamp) throws IOException { - if (sources == null) { - return; - } - for (DataSource dataSource : sources) { - setDataSourcePartitions(dataSource, patternItemSet, partitionItems, sourceName, timestamp); + private String post2Livy() { + String result; + try { + result = restTemplate.postForObject(livyUri, livyConf, String.class); + LOGGER.info(result); + } catch (Exception e) { + LOGGER.error("Post to livy error. {}", e.getMessage()); + result = null; } + return result; } - private void setDataSourcePartitions(DataSource dataSource, String[] patternItemSet, String[] partitionItems, String sourceName, long timestamp) throws IOException { - String name = dataSource.getName(); - for (DataConnector dataConnector : dataSource.getConnectors()) { - if (sourceName.equals(name)) { - setDataConnectorPartitions(dataConnector, patternItemSet, partitionItems, timestamp); - } + private boolean success(List<SegmentPredicate> predicates) throws IOException { + if (CollectionUtils.isEmpty(predicates)) { + return true; } - } + for (SegmentPredicate segPredicate : predicates) { + Predicator predicator = PredicatorFactory.newPredicateInstance(segPredicate); + try { + if (!predicator.predicate()) { + return false; + } + } catch (Exception e) { + return false; + } - private void setDataConnectorPartitions(DataConnector dc, String[] patternItemSet, String[] partitionItems, long timestamp) throws IOException { - Map<String, String> partitionItemMap = genPartitionMap(patternItemSet, partitionItems, timestamp); - /** - * partitions must be a string like: "dt=20170301, hour=12" - * partitionItemMap.toString() is like "{dt=20170301, hour=12}" - */ - String partitions = partitionItemMap.toString().substring(1, partitionItemMap.toString().length() - 1); - partitions = partitions.replaceAll(",", " AND "); - Map<String, String> configMap = dc.getConfigInMaps(); - //config should not be null - configMap.put("where", partitions); - try { - dc.setConfig(configMap); - } catch (JsonProcessingException e) { - LOGGER.error(e.getMessage()); } + return true; } - - private Map<String, String> genPartitionMap(String[] patternItemSet, String[] partitionItems, long timestamp) { - /** - * patternItemSet:{YYYYMMdd,HH} - * partitionItems:{dt,hour} - * partitionItemMap:{dt=20170804,hour=09} - */ - int comparableSizeMin = Math.min(patternItemSet.length, partitionItems.length); - Map<String, String> partitionItemMap = new HashMap<>(); - for (int i = 0; i < comparableSizeMin; i++) { - /** - * in order to get a standard date like 20170427 01 (YYYYMMdd-HH) - */ - String pattern = patternItemSet[i].replace("mm", "MM"); - pattern = pattern.replace("DD", "dd"); - pattern = pattern.replace("hh", "HH"); - SimpleDateFormat sdf = new SimpleDateFormat(pattern); - partitionItemMap.put(partitionItems[i], sdf.format(new Date(timestamp))); - } - return partitionItemMap; + private void initParam(JobDetail jd) throws IOException { + mPredicts = new ArrayList<>(); + livyUri = livyConfProps.getProperty("livy.uri"); + jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap().getString(PREDICATE_JOB_NAME)); + measure = JsonUtil.toEntity(jd.getJobDataMap().getString(MEASURE_KEY), GriffinMeasure.class); + setPredicts(jd.getJobDataMap().getString(PREDICATES_KEY)); + setMeasureInstanceName(measure, jd); } - - private long setCurrentBlockStartTimestamp(long currentSystemTimestamp) { - long currentBlockStartTimestamp = 0; - if (StringUtils.isNotEmpty(lastBlockStartTimestamp)) { - try { - currentBlockStartTimestamp = Long.parseLong(lastBlockStartTimestamp) + Integer.parseInt(interval) * 1000; - } catch (Exception e) { - LOGGER.info("lastBlockStartTimestamp or interval format problem! {}", e.getMessage()); - } - } else { - if (StringUtils.isNotEmpty(blockStartTimestamp)) { - try { - currentBlockStartTimestamp = Long.parseLong(blockStartTimestamp); - } catch (Exception e) { - LOGGER.info("blockStartTimestamp format problem! {}", e.getMessage()); - } - } else { - currentBlockStartTimestamp = currentSystemTimestamp; + private void setPredicts(String json) throws IOException { + List<Map<String, Object>> maps = JsonUtil.toEntity(json, new TypeReference<List<Map>>() { + }); + if (maps != null) { + for (Map<String, Object> map : maps) { + SegmentPredicate sp = new SegmentPredicate(); + sp.setType((String) map.get("type")); + sp.setConfigMap((Map<String, String>) map.get("config")); + mPredicts.add(sp); } } - return currentBlockStartTimestamp; + } + + private void setMeasureInstanceName(GriffinMeasure measure, JobDetail jd) { + // in order to keep metric name unique, we set job name as measure name at present + measure.setName(jd.getJobDataMap().getString(JOB_NAME)); } private String escapeCharacter(String str, String regex) { @@ -227,67 +150,85 @@ public class SparkSubmitJob implements Job { return str.replaceAll(regex, escapeCh); } - private void setSparkJobDO() { - sparkJobDO.setFile(sparkJobProps.getProperty("sparkJob.file")); - sparkJobDO.setClassName(sparkJobProps.getProperty("sparkJob.className")); + private void setLivyConf() throws JsonProcessingException { + setLivyParams(); + setLivyArgs(); + setLivyJars(); + setPropConf(); + } + private void setLivyParams() { + livyConf.setFile(livyConfProps.getProperty("sparkJob.file")); + livyConf.setClassName(livyConfProps.getProperty("sparkJob.className")); + livyConf.setName(livyConfProps.getProperty("sparkJob.name")); + livyConf.setQueue(livyConfProps.getProperty("sparkJob.queue")); + livyConf.setNumExecutors(Long.parseLong(livyConfProps.getProperty("sparkJob.numExecutors"))); + livyConf.setExecutorCores(Long.parseLong(livyConfProps.getProperty("sparkJob.executorCores"))); + livyConf.setDriverMemory(livyConfProps.getProperty("sparkJob.driverMemory")); + livyConf.setExecutorMemory(livyConfProps.getProperty("sparkJob.executorMemory")); + livyConf.setFiles(new ArrayList<>()); + } + + private void setLivyArgs() throws JsonProcessingException { List<String> args = new ArrayList<>(); - args.add(sparkJobProps.getProperty("sparkJob.args_1")); - measure.setTriggerTimeStamp(System.currentTimeMillis()); + args.add(livyConfProps.getProperty("sparkJob.args_1")); String measureJson = JsonUtil.toJsonWithFormat(measure); - // to fix livy bug: ` will be ignored by livy + // to fix livy bug: character ` will be ignored by livy String finalMeasureJson = escapeCharacter(measureJson, "\\`"); + LOGGER.info(finalMeasureJson); args.add(finalMeasureJson); - args.add(sparkJobProps.getProperty("sparkJob.args_3")); - sparkJobDO.setArgs(args); + args.add(livyConfProps.getProperty("sparkJob.args_3")); + livyConf.setArgs(args); + } - sparkJobDO.setName(sparkJobProps.getProperty("sparkJob.name")); - sparkJobDO.setQueue(sparkJobProps.getProperty("sparkJob.queue")); - sparkJobDO.setNumExecutors(Long.parseLong(sparkJobProps.getProperty("sparkJob.numExecutors"))); - sparkJobDO.setExecutorCores(Long.parseLong(sparkJobProps.getProperty("sparkJob.executorCores"))); - sparkJobDO.setDriverMemory(sparkJobProps.getProperty("sparkJob.driverMemory")); - sparkJobDO.setExecutorMemory(sparkJobProps.getProperty("sparkJob.executorMemory")); + private void setLivyJars() { + String jarProp = livyConfProps.getProperty("sparkJob.jars"); + List<String> jars = Arrays.asList(jarProp.split(SPARK_JOB_JARS_SPLIT)); + livyConf.setJars(jars); + } + private void setPropConf() { Map<String, String> conf = new HashMap<>(); - conf.put("spark.jars.packages", sparkJobProps.getProperty("sparkJob.spark.jars.packages")); - sparkJobDO.setConf(conf); - - List<String> jars = new ArrayList<>(); - jars.add(sparkJobProps.getProperty("sparkJob.jars_1")); - jars.add(sparkJobProps.getProperty("sparkJob.jars_2")); - jars.add(sparkJobProps.getProperty("sparkJob.jars_3")); - sparkJobDO.setJars(jars); + conf.put("spark.yarn.dist.files", livyConfProps.getProperty("spark.yarn.dist.files")); + livyConf.setConf(conf); + } - List<String> files = new ArrayList<>(); - sparkJobDO.setFiles(files); + private void saveJobInstance(JobDetail jd) throws SchedulerException, IOException { + String result = post2Livy(); + boolean pauseStatus = false; + if (result != null) { + pauseStatus = jobService.pauseJob(jd.getKey().getGroup(), jd.getKey().getName()); + LOGGER.info("Delete predicate job {}.", pauseStatus ? "success" : "failure"); + } + saveJobInstance(result, LivySessionStates.State.found, pauseStatus); } - public void saveJobInstance(String groupName, String jobName, String result) { + private void saveJobInstance(String result, LivySessionStates.State state, Boolean pauseStatus) throws IOException { TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() { }; - try { - Map<String, Object> resultMap = JsonUtil.toEntity(result, type); - if (resultMap != null) { - JobInstance jobInstance = genJobInstance(groupName, jobName, resultMap); - jobInstanceRepo.save(jobInstance); - } - } catch (IOException e) { - LOGGER.error("jobInstance jsonStr convert to map failed. {}", e.getMessage()); - } catch (IllegalArgumentException e) { - LOGGER.warn("Livy status is illegal. {}", e.getMessage()); + Map<String, Object> resultMap = null; + if (result != null) { + resultMap = JsonUtil.toEntity(result, type); } + setJobInstance(resultMap, state, pauseStatus); + jobInstanceRepo.save(jobInstance); } - private JobInstance genJobInstance(String groupName, String jobName, Map<String, Object> resultMap) throws IllegalArgumentException { - JobInstance jobInstance = new JobInstance(); - jobInstance.setGroupName(groupName); - jobInstance.setJobName(jobName); - jobInstance.setTimestamp(System.currentTimeMillis()); - jobInstance.setSessionId(Integer.parseInt(resultMap.get("id").toString())); - jobInstance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString())); + private void setJobInstance(Map<String, Object> resultMap, LivySessionStates.State state, Boolean pauseStatus) { + jobInstance.setState(state); + jobInstance.setDeleted(pauseStatus); + if (resultMap == null) { + return; + } + if (resultMap.get("state") != null) { + jobInstance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString())); + } + if (resultMap.get("id") != null) { + jobInstance.setSessionId(Long.parseLong(resultMap.get("id").toString())); + } if (resultMap.get("appId") != null) { jobInstance.setAppId(resultMap.get("appId").toString()); } - return jobInstance; } + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java new file mode 100644 index 0000000..21ceec9 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java @@ -0,0 +1,88 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.entity; + +import org.apache.griffin.core.measure.entity.AbstractAuditableEntity; + +import javax.persistence.*; + +@Entity +@Table(name = "job") +@Inheritance(strategy = InheritanceType.SINGLE_TABLE) +@DiscriminatorColumn(name = "type") +public abstract class AbstractJob extends AbstractAuditableEntity { + private static final long serialVersionUID = 7569493377868453677L; + + protected Long measureId; + + protected String jobName; + + protected String metricName; + + protected Boolean deleted = false; + + AbstractJob() { + } + + AbstractJob(Long measureId, String jobName, boolean deleted) { + this.measureId = measureId; + this.jobName = jobName; + this.deleted = deleted; + } + + AbstractJob(String jobName, Long measureId, String metricName) { + this.jobName = jobName; + this.measureId = measureId; + this.metricName = metricName; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public String getMetricName() { + return metricName; + } + + public void setMetricName(String metricName) { + this.metricName = metricName; + } + + public Long getMeasureId() { + return measureId; + } + + public void setMeasureId(Long measureId) { + this.measureId = measureId; + } + + public Boolean getDeleted() { + return deleted; + } + + public void setDeleted(Boolean deleted) { + this.deleted = deleted; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java new file mode 100644 index 0000000..65d8e15 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java @@ -0,0 +1,79 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.entity; + +import javax.persistence.*; +import java.util.ArrayList; +import java.util.List; + +@Entity +@DiscriminatorValue("griffin_job") +public class GriffinJob extends AbstractJob { + + @Column(name = "quartz_job_name") + private String quartzName; + + @Column(name = "quartz_group_name") + private String quartzGroup; + + @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}, orphanRemoval = true) + @JoinColumn(name = "job_id") + private List<JobInstanceBean> jobInstances = new ArrayList<>(); + + public String getQuartzName() { + return quartzName; + } + + public void setQuartzName(String quartzName) { + this.quartzName = quartzName; + } + + public String getQuartzGroup() { + return quartzGroup; + } + + public void setQuartzGroup(String quartzGroup) { + this.quartzGroup = quartzGroup; + } + + public List<JobInstanceBean> getJobInstances() { + return jobInstances; + } + + public void setJobInstances(List<JobInstanceBean> jobInstances) { + this.jobInstances = jobInstances; + } + + public GriffinJob() { + super(); + } + + public GriffinJob(Long measureId, String jobName, String qJobName, String qGroupName, boolean deleted) { + super(measureId, jobName, deleted); + this.metricName = jobName; + this.quartzName = qJobName; + this.quartzGroup = qGroupName; + } + + public GriffinJob(Long jobId, Long measureId, String jobName, String qJobName, String qGroupName, boolean deleted) { + this(measureId, jobName, qJobName, qGroupName, deleted); + setId(jobId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java new file mode 100644 index 0000000..b27ab91 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java @@ -0,0 +1,98 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.entity; + + +import org.quartz.Trigger; + +public class JobDataBean { + + private Long jobId; + + private String jobName; + + private Long measureId; + + private Trigger.TriggerState triggerState; + + private Long nextFireTime; + + private Long previousFireTime; + + private String cronExpression; + + public Long getJobId() { + return jobId; + } + + public void setJobId(Long jobId) { + this.jobId = jobId; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public Long getMeasureId() { + return measureId; + } + + public void setMeasureId(Long measureId) { + this.measureId = measureId; + } + + public Trigger.TriggerState getTriggerState() { + return triggerState; + } + + public void setTriggerState(Trigger.TriggerState triggerState) { + this.triggerState = triggerState; + } + + public Long getNextFireTime() { + return nextFireTime; + } + + public void setNextFireTime(Long nextFireTime) { + this.nextFireTime = nextFireTime; + } + + public Long getPreviousFireTime() { + return previousFireTime; + } + + public void setPreviousFireTime(Long previousFireTime) { + this.previousFireTime = previousFireTime; + } + + public String getCronExpression() { + return cronExpression; + } + + public void setCronExpression(String cronExpression) { + this.cronExpression = cronExpression; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java new file mode 100644 index 0000000..7009b5d --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java @@ -0,0 +1,81 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.entity; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.lang.StringUtils; +import org.apache.griffin.core.measure.entity.AbstractAuditableEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.persistence.*; +import javax.validation.constraints.NotNull; + +@Entity +public class JobDataSegment extends AbstractAuditableEntity { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobDataSegment.class); + + @NotNull + private String dataConnectorName; + + private Boolean baseline = false; + + @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}) + @JoinColumn(name = "segment_range_id") + private SegmentRange segmentRange; + + @JsonProperty("as.baseline") + public Boolean getBaseline() { + return baseline; + } + + @JsonProperty("as.baseline") + public void setBaseline(Boolean baseline) { + this.baseline = baseline; + } + + @JsonProperty("segment.range") + public SegmentRange getSegmentRange() { + return segmentRange; + } + + @JsonProperty("segment.range") + public void setSegmentRange(SegmentRange segmentRange) { + this.segmentRange = segmentRange; + } + + @JsonProperty("data.connector.name") + public String getDataConnectorName() { + return dataConnectorName; + } + + @JsonProperty("data.connector.name") + public void setDataConnectorName(String dataConnectorName) { + if (StringUtils.isEmpty(dataConnectorName)) { + LOGGER.error(" Data connector name is invalid. Please check your connector name."); + throw new NullPointerException(); + } + this.dataConnectorName = dataConnectorName; + } + + public JobDataSegment() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java index 9d2a654..ecb5feb 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java @@ -40,6 +40,8 @@ public class JobHealth { } public JobHealth() { + this.healthyJobCount = 0; + this.jobCount = 0; } public JobHealth(int healthyJobCount, int jobCount) { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java deleted file mode 100644 index 2cb5949..0000000 --- a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java +++ /dev/null @@ -1,111 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.griffin.core.job.entity; - -import org.apache.griffin.core.job.entity.LivySessionStates.State; -import org.apache.griffin.core.measure.entity.AbstractAuditableEntity; - -import javax.persistence.*; - -@Entity -public class JobInstance extends AbstractAuditableEntity { - - private static final long serialVersionUID = -4748881017029815874L; - - private String groupName; - private String jobName; - private int sessionId; - @Enumerated(EnumType.STRING) - private State state; - private String appId; - @Lob - @Column(length = 1024) - private String appUri; - private long timestamp; - - public String getGroupName() { - return groupName; - } - - public void setGroupName(String groupName) { - this.groupName = groupName; - } - - public String getJobName() { - return jobName; - } - - public void setJobName(String jobName) { - this.jobName = jobName; - } - - public int getSessionId() { - return sessionId; - } - - public void setSessionId(int sessionId) { - this.sessionId = sessionId; - } - - public State getState() { - return state; - } - - public void setState(State state) { - this.state = state; - } - - public String getAppId() { - return appId; - } - - public void setAppId(String appId) { - this.appId = appId; - } - - public String getAppUri() { - return appUri; - } - - public void setAppUri(String appUri) { - this.appUri = appUri; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - public JobInstance() { - } - - public JobInstance(String groupName, String jobName, int sessionId, State state, String appId, String appUri, long timestamp) { - this.groupName = groupName; - this.jobName = jobName; - this.sessionId = sessionId; - this.state = state; - this.appId = appId; - this.appUri = appUri; - this.timestamp = timestamp; - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java new file mode 100644 index 0000000..ff4d444 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java @@ -0,0 +1,156 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.entity; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.griffin.core.job.entity.LivySessionStates.State; +import org.apache.griffin.core.measure.entity.AbstractAuditableEntity; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; + +@Entity +public class JobInstanceBean extends AbstractAuditableEntity { + + private static final long serialVersionUID = -4748881017029815874L; + + private Long sessionId; + + @Enumerated(EnumType.STRING) + private State state; + + private String appId; + + @Column(length = 10 * 1024) + private String appUri; + + @Column(name = "timestamp") + private Long tms; + + @Column(name = "expire_timestamp") + private Long expireTms; + + @Column(name = "predicate_group_name") + private String predicateGroup; + + @Column(name = "predicate_job_name") + private String predicateName; + + @Column(name = "predicate_job_deleted") + private Boolean deleted = false; + + public Long getSessionId() { + return sessionId; + } + + public void setSessionId(Long sessionId) { + this.sessionId = sessionId; + } + + public State getState() { + return state; + } + + public void setState(State state) { + this.state = state; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public String getAppUri() { + return appUri; + } + + public void setAppUri(String appUri) { + this.appUri = appUri; + } + + @JsonProperty("timestamp") + public Long getTms() { + return tms; + } + + @JsonProperty("timestamp") + public void setTms(Long tms) { + this.tms = tms; + } + + @JsonProperty("expireTimestamp") + public Long getExpireTms() { + return expireTms; + } + + @JsonProperty("expireTimestamp") + public void setExpireTms(Long expireTms) { + this.expireTms = expireTms; + } + + public String getPredicateGroup() { + return predicateGroup; + } + + public void setPredicateGroup(String predicateGroup) { + this.predicateGroup = predicateGroup; + } + + public String getPredicateName() { + return predicateName; + } + + public void setPredicateName(String predicateName) { + this.predicateName = predicateName; + } + + public Boolean getDeleted() { + return deleted; + } + + public void setDeleted(Boolean deleted) { + this.deleted = deleted; + } + + public JobInstanceBean() { + } + + public JobInstanceBean(State state, String pName, String pGroup, Long tms, Long expireTms) { + this.state = state; + this.predicateName = pName; + this.predicateGroup = pGroup; + this.tms = tms; + this.expireTms = expireTms; + } + + public JobInstanceBean(Long sessionId, State state, String appId, String appUri, Long timestamp, Long expireTms) { + this.sessionId = sessionId; + this.state = state; + this.appId = appId; + this.appUri = appUri; + this.tms = timestamp; + this.expireTms = expireTms; + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java deleted file mode 100644 index 0d0ea40..0000000 --- a/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java +++ /dev/null @@ -1,114 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.core.job.entity; - -public class JobRequestBody { - private String sourcePattern; - private String targetPattern; - private String blockStartTimestamp; - private String jobStartTime; - private String interval; - - public String getSourcePattern() { - return sourcePattern; - } - - public void setSourcePattern(String sourcePattern) { - this.sourcePattern = sourcePattern; - } - - public String getTargetPattern() { - return targetPattern; - } - - public void setTargetPattern(String targetPattern) { - this.targetPattern = targetPattern; - } - - public String getBlockStartTimestamp() { - return blockStartTimestamp; - } - - public void setBlockStartTimestamp(String blockStartTimestamp) { - this.blockStartTimestamp = blockStartTimestamp; - } - - public String getJobStartTime() { - return jobStartTime; - } - - public void setJobStartTime(String jobStartTime) { - this.jobStartTime = jobStartTime; - } - - public String getInterval() { - return interval; - } - - public void setInterval(String interval) { - this.interval = interval; - } - - public JobRequestBody() { - } - - public JobRequestBody(String sourcePattern, String targetPattern, String blockStartTimestamp, String jobStartTime, String interval) { - this.sourcePattern = sourcePattern; - this.targetPattern = targetPattern; - this.blockStartTimestamp = blockStartTimestamp; - this.jobStartTime = jobStartTime; - this.interval = interval; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - JobRequestBody that = (JobRequestBody) o; - - if (sourcePattern != null ? !sourcePattern.equals(that.sourcePattern) : that.sourcePattern != null) { - return false; - } - if (targetPattern != null ? !targetPattern.equals(that.targetPattern) : that.targetPattern != null) { - return false; - } - if (blockStartTimestamp != null ? !blockStartTimestamp.equals(that.blockStartTimestamp) : that.blockStartTimestamp != null) { - return false; - } - if (jobStartTime != null ? !jobStartTime.equals(that.jobStartTime) : that.jobStartTime != null){ - return false; - } - return interval != null ? interval.equals(that.interval) : that.interval == null; - } - - @Override - public int hashCode() { - int result = sourcePattern != null ? sourcePattern.hashCode() : 0; - result = 31 * result + (targetPattern != null ? targetPattern.hashCode() : 0); - result = 31 * result + (blockStartTimestamp != null ? blockStartTimestamp.hashCode() : 0); - result = 31 * result + (jobStartTime != null ? jobStartTime.hashCode() : 0); - result = 31 * result + (interval != null ? interval.hashCode() : 0); - return result; - } -}
