http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java index e66c7f3..6e04122 100644 --- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java +++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java @@ -19,21 +19,22 @@ under the License. package org.apache.griffin.core.job; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import org.apache.griffin.core.job.entity.JobInstanceBean; import org.apache.griffin.core.job.entity.LivyConf; -import org.apache.griffin.core.job.entity.LivySessionStates; import org.apache.griffin.core.job.entity.SegmentPredicate; import org.apache.griffin.core.job.factory.PredicatorFactory; import org.apache.griffin.core.job.repo.JobInstanceRepo; import org.apache.griffin.core.measure.entity.GriffinMeasure; +import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType; +import org.apache.griffin.core.util.FileUtil; import org.apache.griffin.core.util.JsonUtil; import org.quartz.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import org.springframework.web.client.RestTemplate; @@ -42,9 +43,14 @@ import java.io.IOException; import java.util.*; import static org.apache.griffin.core.job.JobInstance.*; +import static org.apache.griffin.core.job.entity.LivySessionStates.State; +import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND; +import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND; +import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH; @PersistJobDataAfterExecution @DisallowConcurrentExecution +@Component public class SparkSubmitJob implements Job { private static final Logger LOGGER = LoggerFactory.getLogger(SparkSubmitJob.class); private static final String SPARK_JOB_JARS_SPLIT = ";"; @@ -55,7 +61,7 @@ public class SparkSubmitJob implements Job { @Qualifier("livyConf") private Properties livyConfProps; @Autowired - private JobServiceImpl jobService; + private BatchJobOperatorImpl batchJobOp; private GriffinMeasure measure; private String livyUri; @@ -76,7 +82,7 @@ public class SparkSubmitJob implements Job { } saveJobInstance(jd); } catch (Exception e) { - LOGGER.error("Post spark task error.", e); + LOGGER.error("Post spark task ERROR.", e); } } @@ -85,7 +91,7 @@ public class SparkSubmitJob implements Job { int repeatCount = simpleTrigger.getRepeatCount(); int fireCount = simpleTrigger.getTimesTriggered(); if (fireCount > repeatCount) { - saveJobInstance(null, LivySessionStates.State.not_found); + saveJobInstance(null, NOT_FOUND); } } @@ -95,7 +101,7 @@ public class SparkSubmitJob implements Job { result = restTemplate.postForObject(livyUri, livyConf, String.class); LOGGER.info(result); } catch (Exception e) { - LOGGER.error("Post to livy error. {}", e.getMessage()); + LOGGER.error("Post to livy ERROR. {}", e.getMessage()); result = null; } return result; @@ -121,11 +127,12 @@ public class SparkSubmitJob implements Job { private void initParam(JobDetail jd) throws IOException { mPredicates = new ArrayList<>(); - livyUri = livyConfProps.getProperty("livy.uri"); jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap().getString(PREDICATE_JOB_NAME)); measure = JsonUtil.toEntity(jd.getJobDataMap().getString(MEASURE_KEY), GriffinMeasure.class); + livyUri = livyConfProps.getProperty("livy.uri"); setPredicates(jd.getJobDataMap().getString(PREDICATES_KEY)); - setMeasureInstanceName(measure, jd); + // in order to keep metric name unique, we set job name as measure name at present + measure.setName(jd.getJobDataMap().getString(JOB_NAME)); } @SuppressWarnings("unchecked") @@ -135,26 +142,26 @@ public class SparkSubmitJob implements Job { } List<Map<String, Object>> maps = JsonUtil.toEntity(json, new TypeReference<List<Map>>() { }); - assert maps != null; for (Map<String, Object> map : maps) { SegmentPredicate sp = new SegmentPredicate(); sp.setType((String) map.get("type")); - sp.setConfigMap((Map<String, String>) map.get("config")); + sp.setConfigMap((Map<String, Object>) map.get("config")); mPredicates.add(sp); } } - private void setMeasureInstanceName(GriffinMeasure measure, JobDetail jd) { - // in order to keep metric name unique, we set job name as measure name at present - measure.setName(jd.getJobDataMap().getString(JOB_NAME)); - } - private String escapeCharacter(String str, String regex) { String escapeCh = "\\" + regex; return str.replaceAll(regex, escapeCh); } - private void setLivyConf() throws JsonProcessingException { + private String genEnv() throws IOException { + ProcessType type = measure.getProcessType(); + String env = type == BATCH ? FileUtil.readEnv("env/env_batch.json") : FileUtil.readEnv("env/env_streaming.json"); + return env.replaceAll("\\$\\{JOB_NAME}", measure.getName()); + } + + private void setLivyConf() throws IOException { setLivyParams(); setLivyArgs(); setLivyJars(); @@ -173,15 +180,15 @@ public class SparkSubmitJob implements Job { livyConf.setFiles(new ArrayList<>()); } - private void setLivyArgs() throws JsonProcessingException { + private void setLivyArgs() throws IOException { List<String> args = new ArrayList<>(); - args.add(livyConfProps.getProperty("sparkJob.args_1")); + args.add(genEnv()); String measureJson = JsonUtil.toJsonWithFormat(measure); // to fix livy bug: character ` will be ignored by livy String finalMeasureJson = escapeCharacter(measureJson, "\\`"); LOGGER.info(finalMeasureJson); args.add(finalMeasureJson); - args.add(livyConfProps.getProperty("sparkJob.args_3")); + args.add("raw,raw"); livyConf.setArgs(args); } @@ -202,16 +209,14 @@ public class SparkSubmitJob implements Job { private void saveJobInstance(JobDetail jd) throws SchedulerException, IOException { String result = post2Livy(); - if (result != null) { - String group = jd.getKey().getGroup(); - String name = jd.getKey().getName(); - jobService.pauseJob(group, name); - LOGGER.info("Delete predicate job({},{}) success.", group, name); - } - saveJobInstance(result, LivySessionStates.State.found); + String group = jd.getKey().getGroup(); + String name = jd.getKey().getName(); + batchJobOp.deleteJob(group, name); + LOGGER.info("Delete predicate job({},{}) SUCCESS.", group, name); + saveJobInstance(result, FOUND); } - private void saveJobInstance(String result, LivySessionStates.State state) throws IOException { + private void saveJobInstance(String result,State state) throws IOException { TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() { }; Map<String, Object> resultMap = null; @@ -222,20 +227,16 @@ public class SparkSubmitJob implements Job { jobInstanceRepo.save(jobInstance); } - private void setJobInstance(Map<String, Object> resultMap, LivySessionStates.State state) { + private void setJobInstance(Map<String, Object> resultMap, State state) { jobInstance.setState(state); - jobInstance.setDeleted(true); - if (resultMap == null) { - return; - } - if (resultMap.get("state") != null) { - jobInstance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString())); - } - if (resultMap.get("id") != null) { - jobInstance.setSessionId(Long.parseLong(resultMap.get("id").toString())); - } - if (resultMap.get("appId") != null) { - jobInstance.setAppId(resultMap.get("appId").toString()); + jobInstance.setPredicateDeleted(true); + if (resultMap != null) { + Object status = resultMap.get("state"); + Object id = resultMap.get("id"); + Object appId = resultMap.get("appId"); + jobInstance.setState(status == null ? null : State.valueOf(status.toString().toUpperCase())); + jobInstance.setSessionId(id == null ? null : Long.parseLong(id.toString())); + jobInstance.setAppId(appId == null ? null : appId.toString()); } }
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java new file mode 100644 index 0000000..55c64ab --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java @@ -0,0 +1,265 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job; + +import org.apache.griffin.core.exception.GriffinException; +import org.apache.griffin.core.job.entity.*; +import org.apache.griffin.core.job.repo.JobInstanceRepo; +import org.apache.griffin.core.job.repo.StreamingJobRepo; +import org.apache.griffin.core.measure.entity.GriffinMeasure; +import org.apache.griffin.core.util.YarnNetUtil; +import org.quartz.JobKey; +import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.TriggerKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.quartz.SchedulerFactoryBean; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.CollectionUtils; +import org.springframework.web.client.ResourceAccessException; +import org.springframework.web.client.RestClientException; +import org.springframework.web.client.RestTemplate; + +import javax.annotation.PostConstruct; +import java.util.List; +import java.util.Properties; + +import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_JOB_NAME; +import static org.apache.griffin.core.exception.GriffinExceptionMessage.STREAMING_JOB_IS_RUNNING; +import static org.apache.griffin.core.job.JobServiceImpl.START; +import static org.apache.griffin.core.job.JobServiceImpl.STOP; +import static org.apache.griffin.core.job.entity.LivySessionStates.State; +import static org.apache.griffin.core.job.entity.LivySessionStates.State.STOPPED; +import static org.apache.griffin.core.job.entity.LivySessionStates.convert2QuartzState; +import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.STREAMING; +import static org.quartz.TriggerKey.triggerKey; + +@Service +public class StreamingJobOperatorImpl implements JobOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamingJobOperatorImpl.class); + @Autowired + private StreamingJobRepo streamingJobRepo; + @Autowired + @Qualifier("livyConf") + private Properties livyConfProps; + @Autowired + private JobServiceImpl jobService; + @Autowired + private JobInstanceRepo instanceRepo; + @Autowired + private SchedulerFactoryBean factory; + + private String livyUri; + private RestTemplate restTemplate; + + @PostConstruct + public void init() { + restTemplate = new RestTemplate(); + livyUri = livyConfProps.getProperty("livy.uri"); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public JobSchedule add(JobSchedule js, GriffinMeasure measure) throws Exception { + validateParams(js); + String qName = jobService.getQuartzName(js); + String qGroup = jobService.getQuartzGroup(); + TriggerKey triggerKey = jobService.getTriggerKeyIfValid(qName, qGroup); + StreamingJob streamingJob = new StreamingJob(js.getMeasureId(), js.getJobName(), qName, qGroup, false); + streamingJob.setJobSchedule(js); + streamingJob = streamingJobRepo.save(streamingJob); + jobService.addJob(triggerKey, js, streamingJob, STREAMING); + JobSchedule jobSchedule = streamingJob.getJobSchedule(); + jobSchedule.setId(streamingJob.getId()); + return jobSchedule; + } + + /** + * active states: NOT_STARTED, STARTING, RECOVERING, IDLE, RUNNING, BUSY + * inactive states: SHUTTING_DOWN, ERROR, DEAD, SUCCESS + * + * @param job streaming job + */ + @Override + @Transactional(rollbackFor = Exception.class) + public void start(AbstractJob job) throws Exception { + StreamingJob streamingJob = (StreamingJob) job; + verifyJobState(streamingJob); + streamingJob = streamingJobRepo.save(streamingJob); + JobSchedule js = streamingJob.getJobSchedule(); + String qName = jobService.getQuartzName(js); + String qGroup = jobService.getQuartzGroup(); + TriggerKey triggerKey = triggerKey(qName, qGroup); + jobService.addJob(triggerKey, js, streamingJob, STREAMING); + } + + private void verifyJobState(StreamingJob job) throws SchedulerException { + /* Firstly you should check whether job is scheduled. If it is scheduled, triggers are empty. */ + List<? extends Trigger> triggers = jobService.getTriggers(job.getName(), job.getGroup()); + if (!CollectionUtils.isEmpty(triggers)) { + throw new GriffinException.BadRequestException(STREAMING_JOB_IS_RUNNING); + } + /* Secondly you should check whether job instance is running. */ + List<JobInstanceBean> instances = instanceRepo.findByJobId(job.getId()); + instances.stream().filter(instance -> !instance.isDeleted()).forEach(instance -> { + State state = instance.getState(); + String quartzState = convert2QuartzState(state); + if (!getStartStatus(quartzState)) { + throw new GriffinException.BadRequestException(STREAMING_JOB_IS_RUNNING); + } + instance.setDeleted(true); + }); + } + + + @Override + @Transactional(rollbackFor = Exception.class) + public void stop(AbstractJob job) throws SchedulerException { + StreamingJob streamingJob = (StreamingJob) job; + stop(streamingJob, false); + } + + @Override + public void delete(AbstractJob job) throws SchedulerException { + StreamingJob streamingJob = (StreamingJob) job; + stop(streamingJob, true); + } + + + @Override + public JobHealth getHealth(JobHealth jobHealth, AbstractJob job) { + jobHealth.setJobCount(jobHealth.getJobCount() + 1); + if (jobService.isJobHealthy(job.getId())) { + jobHealth.setHealthyJobCount(jobHealth.getHealthyJobCount() + 1); + } + return jobHealth; + } + + @Override + public JobState getState(AbstractJob job, JobDataBean bean, String action) { + JobState jobState = new JobState(); + List<JobInstanceBean> instances = instanceRepo.findByJobId(job.getId()); + for (JobInstanceBean instance : instances) { + State state = instance.getState(); + if (!instance.isDeleted() && state != null) { + String quartzState = convert2QuartzState(state); + jobState.setState(quartzState); + jobState.setToStart(getStartStatus(quartzState)); + jobState.setToStop(getStopStatus(quartzState)); + break; + } + } + setStateIfNull(action, jobState); + return jobState; + } + + private void setStateIfNull(String action, JobState jobState) { + if (jobState.getState() == null && START.equals(action)) { + jobState.setState("NORMAL"); + jobState.setToStop(true); + } else if (jobState.getState() == null || STOP.equals(action)) { + jobState.setState("NONE"); + jobState.setToStart(true); + } + + } + + /** + * NORMAL or BLOCKED state of job cannot be started + * + * @param state job state + * @return true: job can be started, false: job is running which cannot be started + */ + private boolean getStartStatus(String state) { + return !"NORMAL".equals(state) && !"BLOCKED".equals(state); + } + + /** + * COMPLETE or ERROR state of job cannot be stopped + * + * @param state job state + * @return true: job can be stopped, false: job is running which cannot be stopped + */ + private boolean getStopStatus(String state) { + return !"COMPLETE".equals(state) && !"ERROR".equals(state); + } + + private void deleteByLivy(JobInstanceBean instance) { + Long sessionId = instance.getSessionId(); + if (sessionId == null) { + LOGGER.warn("Session id of instance({},{}) is null.", instance.getPredicateGroup(), instance.getPredicateName()); + return; + } + String url = livyUri + "/" + instance.getSessionId(); + try { + restTemplate.delete(url); + LOGGER.info("Job instance({}) has been deleted. {}", instance.getSessionId(), url); + } catch (ResourceAccessException e) { + LOGGER.error("Your url may be wrong. Please check {}.\n {}", livyUri, e.getMessage()); + } catch (RestClientException e) { + LOGGER.warn("sessionId({}) appId({}) {}.", instance.getSessionId(), instance.getAppId(), e.getMessage()); + YarnNetUtil.delete(livyUri, instance.getAppId()); + } + } + + + /** + * @param job streaming job + * @param delete true: delete job, false: only stop instance, but not delete job + */ + private void stop(StreamingJob job, boolean delete) throws SchedulerException { + pauseJob(job); + /* to prevent situation that streaming job is submitted before pause or when pausing. */ + List<JobInstanceBean> instances = instanceRepo.findByJobId(job.getId()); + instances.stream().filter(instance -> !instance.isDeleted()).forEach(instance -> { + State state = instance.getState(); + String quartzState = convert2QuartzState(state); + if (getStopStatus(quartzState)) { + deleteByLivy(instance); + + } + instance.setState(STOPPED); + instance.setDeleted(true); + }); + job.setDeleted(delete); + streamingJobRepo.save(job); + } + + private void pauseJob(StreamingJob job) throws SchedulerException { + String name = job.getName(); + String group = job.getGroup(); + List<? extends Trigger> triggers = jobService.getTriggers(name, group); + if (!CollectionUtils.isEmpty(triggers)) { + factory.getScheduler().pauseJob(JobKey.jobKey(name, group)); + } + } + + + private void validateParams(JobSchedule js) { + if (!jobService.isValidJobName(js.getJobName())) { + throw new GriffinException.BadRequestException(INVALID_JOB_NAME); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java index 7839315..0a7388b 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java @@ -19,6 +19,8 @@ under the License. package org.apache.griffin.core.job.entity; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.griffin.core.measure.entity.AbstractAuditableEntity; import javax.persistence.*; @@ -36,14 +38,27 @@ public abstract class AbstractJob extends AbstractAuditableEntity { protected String metricName; + @Column(name = "quartz_job_name") + private String name; + + @Column(name = "quartz_group_name") + private String group; + + @JsonIgnore protected boolean deleted = false; + @OneToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL) + @JoinColumn(name = "job_schedule_id") + private JobSchedule jobSchedule; + AbstractJob() { } - AbstractJob(Long measureId, String jobName, boolean deleted) { + AbstractJob(Long measureId, String jobName, String name, String group, boolean deleted) { this.measureId = measureId; this.jobName = jobName; + this.name = name; + this.group = group; this.deleted = deleted; } @@ -53,26 +68,32 @@ public abstract class AbstractJob extends AbstractAuditableEntity { this.metricName = metricName; } + @JsonProperty("job.name") public String getJobName() { return jobName; } + @JsonProperty("job.name") public void setJobName(String jobName) { this.jobName = jobName; } + @JsonProperty("metric.name") public String getMetricName() { return metricName; } + @JsonProperty("metric.name") public void setMetricName(String metricName) { this.metricName = metricName; } + @JsonProperty("measure.id") public Long getMeasureId() { return measureId; } + @JsonProperty("measure.id") public void setMeasureId(Long measureId) { this.measureId = measureId; } @@ -85,4 +106,33 @@ public abstract class AbstractJob extends AbstractAuditableEntity { this.deleted = deleted; } + @JsonProperty("job.config") + public JobSchedule getJobSchedule() { + return jobSchedule; + } + + @JsonProperty("job.config") + public void setJobSchedule(JobSchedule jobSchedule) { + this.jobSchedule = jobSchedule; + } + + @JsonProperty("quartz.name") + public String getName() { + return name; + } + + @JsonProperty("quartz.name") + public void setName(String quartzName) { + this.name = quartzName; + } + + @JsonProperty("quartz.group") + public String getGroup() { + return group; + } + + @JsonProperty("quartz.group") + public void setGroup(String quartzGroup) { + this.group = quartzGroup; + } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/BatchJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/BatchJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/BatchJob.java new file mode 100644 index 0000000..54f938d --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/entity/BatchJob.java @@ -0,0 +1,43 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.entity; + +import javax.persistence.DiscriminatorValue; +import javax.persistence.Entity; + +@Entity +@DiscriminatorValue("griffinBatchJob") +public class BatchJob extends AbstractJob { + public BatchJob() { + super(); + } + + public BatchJob(Long measureId, String jobName, String name, String group, boolean deleted) { + super(measureId, jobName, name, group, deleted); + this.metricName = jobName; + } + + public BatchJob(Long jobId, Long measureId, String jobName, String qJobName, String qGroupName, boolean deleted) { + this(measureId, jobName, qJobName, qGroupName, deleted); + setId(jobId); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java deleted file mode 100644 index fe9553c..0000000 --- a/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java +++ /dev/null @@ -1,86 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.griffin.core.job.entity; - -import javax.persistence.*; -import java.util.ArrayList; -import java.util.List; - -@Entity -@DiscriminatorValue("griffin_job") -public class GriffinJob extends AbstractJob { - - @Column(name = "quartz_job_name") - private String quartzName; - - @Column(name = "quartz_group_name") - private String quartzGroup; - - - @OneToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL) - @JoinColumn(name = "job_schedule_id") - private JobSchedule jobSchedule; - - public String getQuartzName() { - return quartzName; - } - - public void setQuartzName(String quartzName) { - this.quartzName = quartzName; - } - - public String getQuartzGroup() { - return quartzGroup; - } - - public void setQuartzGroup(String quartzGroup) { - this.quartzGroup = quartzGroup; - } - - public JobSchedule getJobSchedule() { - return jobSchedule; - } - - public void setJobSchedule(JobSchedule jobSchedule) { - this.jobSchedule = jobSchedule; - } - - - public GriffinJob() { - super(); - } - - public GriffinJob(Long measureId, String jobName, String quartzName, String quartzGroup, JobSchedule schedule,boolean deleted) { - this(measureId, jobName, quartzName, quartzGroup, deleted); - this.jobSchedule = schedule; - } - - public GriffinJob(Long measureId, String jobName, String quartzName, String quartzGroup, boolean deleted) { - super(measureId, jobName, deleted); - this.metricName = jobName; - this.quartzName = quartzName; - this.quartzGroup = quartzGroup; - } - - public GriffinJob(Long jobId, Long measureId, String jobName, String qJobName, String qGroupName, boolean deleted) { - this(measureId, jobName, qJobName, qGroupName, deleted); - setId(jobId); - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java index b27ab91..d97dc52 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java @@ -20,7 +20,8 @@ under the License. package org.apache.griffin.core.job.entity; -import org.quartz.Trigger; +import com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType; public class JobDataBean { @@ -30,14 +31,19 @@ public class JobDataBean { private Long measureId; - private Trigger.TriggerState triggerState; + private JobState jobState; + @JsonInclude(JsonInclude.Include.NON_NULL) private Long nextFireTime; + @JsonInclude(JsonInclude.Include.NON_NULL) private Long previousFireTime; + @JsonInclude(JsonInclude.Include.NON_NULL) private String cronExpression; + private ProcessType processType; + public Long getJobId() { return jobId; } @@ -62,12 +68,12 @@ public class JobDataBean { this.measureId = measureId; } - public Trigger.TriggerState getTriggerState() { - return triggerState; + public JobState getJobState() { + return jobState; } - public void setTriggerState(Trigger.TriggerState triggerState) { - this.triggerState = triggerState; + public void setJobState(JobState jobState) { + this.jobState = jobState; } public Long getNextFireTime() { @@ -94,5 +100,11 @@ public class JobDataBean { this.cronExpression = cronExpression; } + public ProcessType getProcessType() { + return processType; + } + public void setProcessType(ProcessType processType) { + this.processType = processType; + } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java index 89e0e8a..32dba65 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java @@ -19,12 +19,17 @@ under the License. package org.apache.griffin.core.job.entity; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.griffin.core.job.entity.LivySessionStates.State; import org.apache.griffin.core.measure.entity.AbstractAuditableEntity; +import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType; import javax.persistence.*; +import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH; + @Entity public class JobInstanceBean extends AbstractAuditableEntity { @@ -35,9 +40,15 @@ public class JobInstanceBean extends AbstractAuditableEntity { @Enumerated(EnumType.STRING) private State state; + @Enumerated(EnumType.STRING) + private ProcessType type = BATCH; + + /** The application id of this session **/ + @JsonInclude(JsonInclude.Include.NON_NULL) private String appId; @Column(length = 2 * 1024) + @JsonInclude(JsonInclude.Include.NON_NULL) private String appUri; @Column(name = "timestamp") @@ -47,17 +58,32 @@ public class JobInstanceBean extends AbstractAuditableEntity { private Long expireTms; @Column(name = "predicate_group_name") + @JsonInclude(JsonInclude.Include.NON_NULL) private String predicateGroup; @Column(name = "predicate_job_name") + @JsonInclude(JsonInclude.Include.NON_NULL) private String predicateName; @Column(name = "predicate_job_deleted") + @JsonIgnore + private boolean predicateDeleted = false; + + @JsonIgnore private boolean deleted = false; @ManyToOne @JoinColumn(name = "job_id",nullable = false) - private GriffinJob griffinJob; + @JsonIgnore + private AbstractJob job; + + public AbstractJob getJob() { + return job; + } + + public void setJob(AbstractJob job) { + this.job = job; + } public Long getSessionId() { return sessionId; @@ -75,6 +101,14 @@ public class JobInstanceBean extends AbstractAuditableEntity { this.state = state; } + public ProcessType getType() { + return type; + } + + public void setType(ProcessType type) { + this.type = type; + } + public String getAppId() { return appId; } @@ -127,6 +161,14 @@ public class JobInstanceBean extends AbstractAuditableEntity { this.predicateName = predicateName; } + public boolean isPredicateDeleted() { + return predicateDeleted; + } + + public void setPredicateDeleted(boolean predicateDeleted) { + this.predicateDeleted = predicateDeleted; + } + public boolean isDeleted() { return deleted; } @@ -135,15 +177,13 @@ public class JobInstanceBean extends AbstractAuditableEntity { this.deleted = deleted; } - public GriffinJob getGriffinJob() { - return griffinJob; - } - - public void setGriffinJob(GriffinJob griffinJob) { - this.griffinJob = griffinJob; + public JobInstanceBean() { } - public JobInstanceBean() { + public JobInstanceBean(State state, Long tms, Long expireTms) { + this.state = state; + this.tms = tms; + this.expireTms = expireTms; } public JobInstanceBean(State state, String pName, String pGroup, Long tms, Long expireTms) { @@ -154,6 +194,16 @@ public class JobInstanceBean extends AbstractAuditableEntity { this.expireTms = expireTms; } + public JobInstanceBean(State state, String pName, String pGroup, Long tms, Long expireTms,AbstractJob job) { + this(state, pName, pGroup, tms, expireTms); + this.job = job; + } + + public JobInstanceBean(State state, String pName, String pGroup, Long tms, Long expireTms, ProcessType type) { + this(state, pName, pGroup, tms, expireTms); + this.type = type; + } + public JobInstanceBean(Long sessionId, State state, String appId, String appUri, Long timestamp, Long expireTms) { this.sessionId = sessionId; this.state = state; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java index c71b6d8..f85af34 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java @@ -20,6 +20,7 @@ under the License. package org.apache.griffin.core.job.entity; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -27,7 +28,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.griffin.core.measure.entity.AbstractAuditableEntity; import org.apache.griffin.core.util.JsonUtil; import org.apache.griffin.core.util.PropertiesUtil; -import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.ClassPathResource; @@ -48,14 +48,17 @@ public class JobSchedule extends AbstractAuditableEntity { @NotNull private String jobName; - @NotNull + @JsonInclude(JsonInclude.Include.NON_NULL) private String cronExpression; + @Transient + @JsonInclude(JsonInclude.Include.NON_NULL) + private JobState jobState; + @NotNull private String timeZone; @JsonIgnore -// @Access(AccessType.PROPERTY) private String predicateConfig; @Transient @@ -97,13 +100,17 @@ public class JobSchedule extends AbstractAuditableEntity { @JsonProperty("cron.expression") public void setCronExpression(String cronExpression) { - if (StringUtils.isEmpty(cronExpression) || !isCronExpressionValid(cronExpression)) { - LOGGER.warn("Cron expression is invalid.Please check your cron expression."); - throw new IllegalArgumentException(); - } this.cronExpression = cronExpression; } + public JobState getJobState() { + return jobState; + } + + public void setJobState(JobState jobState) { + this.jobState = jobState; + } + @JsonProperty("cron.time.zone") public String getTimeZone() { return timeZone; @@ -174,14 +181,6 @@ public class JobSchedule extends AbstractAuditableEntity { return scheduleConf; } - private boolean isCronExpressionValid(String cronExpression) { - if (!CronExpression.isValidExpression(cronExpression)) { - LOGGER.warn("Cron expression {} is invalid.", cronExpression); - return false; - } - return true; - } - public JobSchedule() throws JsonProcessingException { } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/JobState.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobState.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobState.java new file mode 100644 index 0000000..8219af2 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobState.java @@ -0,0 +1,65 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.entity; + +/** + * Encapsulating job scheduler state to reduce job startup and stop logical processing + */ +public class JobState { + + /** + * job scheduler state + */ + private String state; + + /** + * whether job can be started + */ + private boolean toStart = false; + + /** + * whether job can be stopped + */ + private boolean toStop = false; + + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + public boolean isToStart() { + return toStart; + } + + public void setToStart(boolean toStart) { + this.toStart = toStart; + } + + public boolean isToStop() { + return toStop; + } + + public void setToStop(boolean toStop) { + this.toStop = toStop; + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java index 01e5070..51e06b0 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java @@ -20,28 +20,35 @@ under the License. package org.apache.griffin.core.job.entity; import com.cloudera.livy.sessions.SessionState; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.quartz.Trigger; + +import static org.apache.griffin.core.job.entity.LivySessionStates.State.*; +import static org.quartz.Trigger.TriggerState; public class LivySessionStates { /** - * unknown is used to represent the state that server get null from Livy. + * UNKNOWN is used to represent the state that server get null from Livy. * the other state is just same as com.cloudera.livy.sessions.SessionState. */ public enum State { - not_started, - starting, - recovering, - idle, - running, - busy, - shutting_down, - error, - dead, - success, - unknown, - finding, - not_found, - found + NOT_STARTED, + STARTING, + RECOVERING, + IDLE, + RUNNING, + BUSY, + SHUTTING_DOWN, + ERROR, + DEAD, + SUCCESS, + UNKNOWN, + STOPPED, + FINDING, + NOT_FOUND, + FOUND } private static SessionState toSessionState(State state) { @@ -49,43 +56,93 @@ public class LivySessionStates { return null; } switch (state) { - case not_started: + case NOT_STARTED: return new SessionState.NotStarted(); - case starting: + case STARTING: return new SessionState.Starting(); - case recovering: + case RECOVERING: return new SessionState.Recovering(); - case idle: + case IDLE: return new SessionState.Idle(); - case running: + case RUNNING: return new SessionState.Running(); - case busy: + case BUSY: return new SessionState.Busy(); - case shutting_down: + case SHUTTING_DOWN: return new SessionState.ShuttingDown(); - case error: + case ERROR: return new SessionState.Error(System.nanoTime()); - case dead: + case DEAD: return new SessionState.Dead(System.nanoTime()); - case success: + case SUCCESS: return new SessionState.Success(System.nanoTime()); default: return null; } } + public static State toLivyState(JsonObject object) { + if (object != null) { + JsonElement state = object.get("state"); + JsonElement finalStatus = object.get("finalStatus"); + State finalState = parseState(state); + return finalState != null ? finalState : parseState(finalStatus); + } + return UNKNOWN; + } + + private static State parseState(JsonElement state) { + if (state == null) { + return null; + } + switch (state.getAsString()) { + case "NEW": + case "NEW_SAVING": + case "SUBMITTED": + return NOT_STARTED; + case "ACCEPTED": + return STARTING; + case "RUNNING": + return RUNNING; + case "SUCCEEDED": + return SUCCESS; + case "FAILED": + return DEAD; + case "KILLED": + return SHUTTING_DOWN; + case "FINISHED": + return null; + default: + return UNKNOWN; + } + } + public static boolean isActive(State state) { - if (State.unknown.equals(state) || State.finding.equals(state) || State.not_found.equals(state) || State.found.equals(state)) { - // set unknown isActive() as false. + if (UNKNOWN.equals(state) || STOPPED.equals(state) || NOT_FOUND.equals(state) || FOUND.equals(state)) { + // set UNKNOWN isActive() as false. return false; + } else if (FINDING.equals(state)) { + return true; } SessionState sessionState = toSessionState(state); return sessionState != null && sessionState.isActive(); } + public static String convert2QuartzState(State state) { + SessionState sessionState = toSessionState(state); + if (STOPPED.equals(state) || SUCCESS.equals(state)) { + return "COMPLETE"; + } + if (UNKNOWN.equals(state) || NOT_FOUND.equals(state) || FOUND.equals(state) || sessionState == null || !sessionState.isActive()) { + return "ERROR"; + } + return "NORMAL"; + + } + public static boolean isHealthy(State state) { - return !(State.error.equals(state) || State.dead.equals(state) || - State.shutting_down.equals(state) || State.finding.equals(state) || - State.not_found.equals(state) || State.found.equals(state)); + return !(State.ERROR.equals(state) || State.DEAD.equals(state) || + State.SHUTTING_DOWN.equals(state) || State.FINDING.equals(state) || + State.NOT_FOUND.equals(state) || State.FOUND.equals(state)); } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java index dafba89..b3deb31 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java @@ -38,11 +38,10 @@ public class SegmentPredicate extends AbstractAuditableEntity { private String type; @JsonIgnore -// @Access(AccessType.PROPERTY) private String config; @Transient - private Map<String, String> configMap; + private Map<String, Object> configMap; public String getType() { return type; @@ -53,20 +52,20 @@ public class SegmentPredicate extends AbstractAuditableEntity { } @JsonProperty("config") - public Map<String, String> getConfigMap() { + public Map<String, Object> getConfigMap() { return configMap; } @JsonProperty("config") - public void setConfigMap(Map<String, String> configMap) { + public void setConfigMap(Map<String, Object> configMap) { this.configMap = configMap; } - public String getConfig() { + private String getConfig() { return config; } - public void setConfig(String config) { + private void setConfig(String config) { this.config = config; } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/StreamingJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/StreamingJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/StreamingJob.java new file mode 100644 index 0000000..70a4acc --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/entity/StreamingJob.java @@ -0,0 +1,36 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.entity; + +import javax.persistence.DiscriminatorValue; +import javax.persistence.Entity; + +@Entity +@DiscriminatorValue("griffinStreamingJob") +public class StreamingJob extends AbstractJob { + + public StreamingJob() { + } + + public StreamingJob(Long measureId, String jobName, String name, String group, boolean deleted) { + super(measureId, jobName, name, group, deleted); + this.metricName = jobName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java index ad98603..ed2c88d 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java @@ -19,9 +19,11 @@ under the License. package org.apache.griffin.core.job.entity; +import javax.persistence.DiscriminatorValue; import javax.persistence.Entity; @Entity +@DiscriminatorValue("virtualJob") public class VirtualJob extends AbstractJob { public VirtualJob() { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/repo/BatchJobRepo.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/BatchJobRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/BatchJobRepo.java new file mode 100644 index 0000000..665887e --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/repo/BatchJobRepo.java @@ -0,0 +1,25 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.repo; + +import org.apache.griffin.core.job.entity.BatchJob; + +public interface BatchJobRepo extends JobRepo<BatchJob> { +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java deleted file mode 100644 index aaaa77d..0000000 --- a/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java +++ /dev/null @@ -1,25 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.griffin.core.job.repo; - -import org.apache.griffin.core.job.entity.GriffinJob; - -public interface GriffinJobRepo extends JobRepo<GriffinJob> { -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java index d55d27d..868a321 100644 --- a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java +++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java @@ -31,23 +31,19 @@ import static org.apache.griffin.core.job.entity.LivySessionStates.State; public interface JobInstanceRepo extends CrudRepository<JobInstanceBean, Long> { -// @Query("select DISTINCT s from JobInstanceBean s " + -// "where s.state in ('starting', 'not_started', 'recovering', 'idle', 'running', 'busy')") -// List<JobInstanceBean> findByActiveState(); - JobInstanceBean findByPredicateName(String name); - @Query("select s from JobInstanceBean s where s.griffinJob.id = ?1") + @Query("select s from JobInstanceBean s where s.job.id = ?1") List<JobInstanceBean> findByJobId(Long jobId, Pageable pageable); - @Query("select s from JobInstanceBean s where s.griffinJob.id = ?1") + @Query("select s from JobInstanceBean s where s.job.id = ?1") List<JobInstanceBean> findByJobId(Long jobId); List<JobInstanceBean> findByExpireTmsLessThanEqual(Long expireTms); @Transactional(rollbackFor = Exception.class) @Modifying - @Query("delete from JobInstanceBean j where j.expireTms <= ?1") + @Query("delete from JobInstanceBean j where j.expireTms <= ?1 and j.deleted = false ") int deleteByExpireTimestamp(Long expireTms); @Query("select DISTINCT s from JobInstanceBean s " + http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/repo/StreamingJobRepo.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/StreamingJobRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/StreamingJobRepo.java new file mode 100644 index 0000000..a6b198b --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/repo/StreamingJobRepo.java @@ -0,0 +1,6 @@ +package org.apache.griffin.core.job.repo; + +import org.apache.griffin.core.job.entity.StreamingJob; + +public interface StreamingJobRepo extends JobRepo<StreamingJob> { +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java deleted file mode 100644 index bb76c09..0000000 --- a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java +++ /dev/null @@ -1,80 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.griffin.core.measure; - -import org.apache.griffin.core.job.entity.VirtualJob; -import org.apache.griffin.core.job.repo.VirtualJobRepo; -import org.apache.griffin.core.measure.entity.ExternalMeasure; -import org.apache.griffin.core.measure.entity.Measure; -import org.apache.griffin.core.measure.repo.ExternalMeasureRepo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; - -import static org.apache.griffin.core.util.MeasureUtil.validateMeasure; - -@Component("externalOperation") -public class ExternalMeasureOperationImpl implements MeasureOperation { - private static final Logger LOGGER = LoggerFactory.getLogger(ExternalMeasureOperationImpl.class); - - @Autowired - private ExternalMeasureRepo measureRepo; - @Autowired - private VirtualJobRepo jobRepo; - - @Override - @Transactional - public Measure create(Measure measure) { - ExternalMeasure em = (ExternalMeasure) measure; - validateMeasure(em); - em.setVirtualJob(new VirtualJob()); - em = measureRepo.save(em); - VirtualJob vj = genVirtualJob(em, em.getVirtualJob()); - jobRepo.save(vj); - return em; - } - - @Override - public void update(Measure measure) { - ExternalMeasure latestMeasure = (ExternalMeasure) measure; - validateMeasure(latestMeasure); - ExternalMeasure originMeasure = measureRepo.findOne(latestMeasure.getId()); - VirtualJob vj = genVirtualJob(latestMeasure, originMeasure.getVirtualJob()); - latestMeasure.setVirtualJob(vj); - measureRepo.save(latestMeasure); - } - - @Override - public void delete(Measure measure) { - ExternalMeasure em = (ExternalMeasure) measure; - em.setDeleted(true); - em.getVirtualJob().setDeleted(true); - measureRepo.save(em); - } - - private VirtualJob genVirtualJob(ExternalMeasure em, VirtualJob vj) { - vj.setMeasureId(em.getId()); - vj.setJobName(em.getName()); - vj.setMetricName(em.getMetricName()); - return vj; - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java new file mode 100644 index 0000000..1a5068c --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java @@ -0,0 +1,81 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.measure; + +import org.apache.griffin.core.job.entity.VirtualJob; +import org.apache.griffin.core.job.repo.VirtualJobRepo; +import org.apache.griffin.core.measure.entity.ExternalMeasure; +import org.apache.griffin.core.measure.entity.Measure; +import org.apache.griffin.core.measure.repo.ExternalMeasureRepo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import static org.apache.griffin.core.util.MeasureUtil.validateMeasure; + +@Component("externalOperation") +public class ExternalMeasureOperatorImpl implements MeasureOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(ExternalMeasureOperatorImpl.class); + + @Autowired + private ExternalMeasureRepo measureRepo; + @Autowired + private VirtualJobRepo jobRepo; + + @Override + @Transactional + public Measure create(Measure measure) { + ExternalMeasure em = (ExternalMeasure) measure; + validateMeasure(em); + em.setVirtualJob(new VirtualJob()); + em = measureRepo.save(em); + VirtualJob vj = genVirtualJob(em, em.getVirtualJob()); + jobRepo.save(vj); + return em; + } + + @Override + public Measure update(Measure measure) { + ExternalMeasure latestMeasure = (ExternalMeasure) measure; + validateMeasure(latestMeasure); + ExternalMeasure originMeasure = measureRepo.findOne(latestMeasure.getId()); + VirtualJob vj = genVirtualJob(latestMeasure, originMeasure.getVirtualJob()); + latestMeasure.setVirtualJob(vj); + measure = measureRepo.save(latestMeasure); + return measure; + } + + @Override + public void delete(Measure measure) { + ExternalMeasure em = (ExternalMeasure) measure; + em.setDeleted(true); + em.getVirtualJob().setDeleted(true); + measureRepo.save(em); + } + + private VirtualJob genVirtualJob(ExternalMeasure em, VirtualJob vj) { + vj.setMeasureId(em.getId()); + vj.setJobName(em.getName()); + vj.setMetricName(em.getMetricName()); + return vj; + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java deleted file mode 100644 index c15ff23..0000000 --- a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java +++ /dev/null @@ -1,66 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.griffin.core.measure; - -import org.apache.griffin.core.job.JobServiceImpl; -import org.apache.griffin.core.measure.entity.Measure; -import org.apache.griffin.core.measure.repo.MeasureRepo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import static org.apache.griffin.core.util.MeasureUtil.validateMeasure; - -@Component("griffinOperation") -public class GriffinMeasureOperationImpl implements MeasureOperation { - private static final Logger LOGGER = LoggerFactory.getLogger(GriffinMeasureOperationImpl.class); - - private final MeasureRepo<Measure> measureRepo; - - private final JobServiceImpl jobService; - - @Autowired - public GriffinMeasureOperationImpl(MeasureRepo<Measure> measureRepo, JobServiceImpl jobService) { - this.measureRepo = measureRepo; - this.jobService = jobService; - } - - - @Override - public Measure create(Measure measure) { - validateMeasure(measure); - return measureRepo.save(measure); - } - - @Override - public void update(Measure measure) { - validateMeasure(measure); - measure.setDeleted(false); - measureRepo.save(measure); - } - - @Override - public void delete(Measure measure) { - jobService.deleteJobsRelateToMeasure(measure.getId()); - measure.setDeleted(true); - measureRepo.save(measure); - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperatorImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperatorImpl.java new file mode 100644 index 0000000..9938e0e --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperatorImpl.java @@ -0,0 +1,68 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.measure; + +import org.apache.griffin.core.job.JobServiceImpl; +import org.apache.griffin.core.measure.entity.Measure; +import org.apache.griffin.core.measure.repo.MeasureRepo; +import org.quartz.SchedulerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import static org.apache.griffin.core.util.MeasureUtil.validateMeasure; + +@Component("griffinOperation") +public class GriffinMeasureOperatorImpl implements MeasureOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(GriffinMeasureOperatorImpl.class); + + private final MeasureRepo<Measure> measureRepo; + + private final JobServiceImpl jobService; + + @Autowired + public GriffinMeasureOperatorImpl(MeasureRepo<Measure> measureRepo, JobServiceImpl jobService) { + this.measureRepo = measureRepo; + this.jobService = jobService; + } + + + @Override + public Measure create(Measure measure) { + validateMeasure(measure); + return measureRepo.save(measure); + } + + @Override + public Measure update(Measure measure) { + validateMeasure(measure); + measure.setDeleted(false); + measure = measureRepo.save(measure); + return measure; + } + + @Override + public void delete(Measure measure) throws SchedulerException { + jobService.deleteJobsRelateToMeasure(measure.getId()); + measure.setDeleted(true); + measureRepo.save(measure); + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java index d7917e4..98d2211 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java @@ -20,10 +20,12 @@ under the License. package org.apache.griffin.core.measure; import org.apache.griffin.core.measure.entity.Measure; +import org.quartz.SchedulerException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.*; +import javax.validation.Valid; import java.util.List; @RestController @@ -44,24 +46,24 @@ public class MeasureController { @RequestMapping(value = "/measures/{id}", method = RequestMethod.DELETE) @ResponseStatus(HttpStatus.NO_CONTENT) - public void deleteMeasureById(@PathVariable("id") Long id) { + public void deleteMeasureById(@PathVariable("id") Long id) throws SchedulerException { measureService.deleteMeasureById(id); } @RequestMapping(value = "/measures", method = RequestMethod.DELETE) @ResponseStatus(HttpStatus.NO_CONTENT) - public void deleteMeasures() { + public void deleteMeasures() throws SchedulerException { measureService.deleteMeasures(); } @RequestMapping(value = "/measures", method = RequestMethod.PUT) - @ResponseStatus(HttpStatus.NO_CONTENT) - public void updateMeasure(@RequestBody Measure measure) { - measureService.updateMeasure(measure); + @ResponseStatus(HttpStatus.OK) + public Measure updateMeasure(@RequestBody Measure measure) { + return measureService.updateMeasure(measure); } @RequestMapping(value = "/measures/owner/{owner}", method = RequestMethod.GET) - public List<Measure> getAliveMeasuresByOwner(@PathVariable("owner") String owner) { + public List<Measure> getAliveMeasuresByOwner(@PathVariable("owner") @Valid String owner) { return measureService.getAliveMeasuresByOwner(owner); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java deleted file mode 100644 index 1ebc06c..0000000 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java +++ /dev/null @@ -1,33 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.griffin.core.measure; - - -import org.apache.griffin.core.measure.entity.Measure; - -public interface MeasureOperation { - - Measure create(Measure measure); - - void update(Measure measure); - - void delete(Measure measure); - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/MeasureOperator.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOperator.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperator.java new file mode 100644 index 0000000..42a361a --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperator.java @@ -0,0 +1,34 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.measure; + + +import org.apache.griffin.core.measure.entity.Measure; +import org.quartz.SchedulerException; + +public interface MeasureOperator { + + Measure create(Measure measure); + + Measure update(Measure measure); + + void delete(Measure measure) throws SchedulerException; + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java index 37942bf..d422ce6 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java @@ -21,6 +21,7 @@ package org.apache.griffin.core.measure; import org.apache.griffin.core.measure.entity.Measure; +import org.quartz.SchedulerException; import java.util.List; @@ -30,11 +31,11 @@ public interface MeasureService { Measure getMeasureById(long id); - void deleteMeasureById(Long id); + void deleteMeasureById(Long id) throws SchedulerException; - void deleteMeasures(); + void deleteMeasures() throws SchedulerException; - void updateMeasure(Measure measure); + Measure updateMeasure(Measure measure); List<Measure> getAliveMeasuresByOwner(String owner); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java index bd24be3..a983f01 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java @@ -21,11 +21,13 @@ package org.apache.griffin.core.measure; import org.apache.griffin.core.exception.GriffinException; +import org.apache.griffin.core.measure.entity.ExternalMeasure; import org.apache.griffin.core.measure.entity.GriffinMeasure; import org.apache.griffin.core.measure.entity.Measure; import org.apache.griffin.core.measure.repo.ExternalMeasureRepo; import org.apache.griffin.core.measure.repo.GriffinMeasureRepo; import org.apache.griffin.core.measure.repo.MeasureRepo; +import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -40,6 +42,8 @@ import static org.apache.griffin.core.exception.GriffinExceptionMessage.*; @Service public class MeasureServiceImpl implements MeasureService { private static final Logger LOGGER = LoggerFactory.getLogger(MeasureServiceImpl.class); + private static final String GRIFFIN = "griffin"; + private static final String EXTERNAL = "external"; @Autowired private MeasureRepo<Measure> measureRepo; @@ -49,16 +53,16 @@ public class MeasureServiceImpl implements MeasureService { private ExternalMeasureRepo externalMeasureRepo; @Autowired @Qualifier("griffinOperation") - private MeasureOperation griffinOp; + private MeasureOperator griffinOp; @Autowired @Qualifier("externalOperation") - private MeasureOperation externalOp; + private MeasureOperator externalOp; @Override public List<? extends Measure> getAllAliveMeasures(String type) { - if (type.equals("griffin")) { + if (type.equals(GRIFFIN)) { return griffinMeasureRepo.findByDeleted(false); - } else if (type.equals("external")) { + } else if (type.equals(EXTERNAL)) { return externalMeasureRepo.findByDeleted(false); } return measureRepo.findByDeleted(false); @@ -85,12 +89,12 @@ public class MeasureServiceImpl implements MeasureService { LOGGER.warn("Failed to create new measure {}, it already exists.", measure.getName()); throw new GriffinException.ConflictException(MEASURE_NAME_ALREADY_EXIST); } - MeasureOperation op = getOperation(measure); + MeasureOperator op = getOperation(measure); return op.create(measure); } @Override - public void updateMeasure(Measure measure) { + public Measure updateMeasure(Measure measure) { Measure m = measureRepo.findByIdAndDeleted(measure.getId(), false); if (m == null) { throw new GriffinException.NotFoundException(MEASURE_ID_DOES_NOT_EXIST); @@ -99,34 +103,36 @@ public class MeasureServiceImpl implements MeasureService { LOGGER.warn("Can't update measure to different type."); throw new GriffinException.BadRequestException(MEASURE_TYPE_DOES_NOT_MATCH); } - MeasureOperation op = getOperation(measure); - op.update(measure); + MeasureOperator op = getOperation(measure); + return op.update(measure); } @Override - public void deleteMeasureById(Long measureId) { + public void deleteMeasureById(Long measureId) throws SchedulerException { Measure measure = measureRepo.findByIdAndDeleted(measureId, false); if (measure == null) { throw new GriffinException.NotFoundException(MEASURE_ID_DOES_NOT_EXIST); } - MeasureOperation op = getOperation(measure); + MeasureOperator op = getOperation(measure); op.delete(measure); } @Override - public void deleteMeasures() { + public void deleteMeasures() throws SchedulerException { List<Measure> measures = measureRepo.findByDeleted(false); for (Measure m : measures) { - MeasureOperation op = getOperation(m); + MeasureOperator op = getOperation(m); op.delete(m); } } - private MeasureOperation getOperation(Measure measure) { + private MeasureOperator getOperation(Measure measure) { if (measure instanceof GriffinMeasure) { return griffinOp; + } else if (measure instanceof ExternalMeasure) { + return externalOp; } - return externalOp; + throw new GriffinException.BadRequestException(MEASURE_TYPE_DOES_NOT_SUPPORT); } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java index 532cb48..916df33 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java @@ -28,6 +28,7 @@ import org.apache.griffin.core.job.entity.SegmentPredicate; import org.apache.griffin.core.util.JsonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import javax.persistence.*; @@ -43,10 +44,20 @@ public class DataConnector extends AbstractAuditableEntity { private final static Logger LOGGER = LoggerFactory.getLogger(DataConnector.class); + public enum DataType { + /** + * There are three data source type which we support now. + */ + HIVE, + KAFKA, + AVRO + } + @NotNull private String name; - private String type; + @Enumerated(EnumType.STRING) + private DataType type; private String version; @@ -61,16 +72,20 @@ public class DataConnector extends AbstractAuditableEntity { private String defaultDataUnit = "365000d"; @JsonIgnore -// @Access(AccessType.PROPERTY) private String config; @Transient - private Map<String, String> configMap; + private Map<String, Object> configMap; @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}) @JoinColumn(name = "data_connector_id") private List<SegmentPredicate> predicates = new ArrayList<>(); + @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}) + @JoinColumn(name = "pre_process_id") + @JsonInclude(JsonInclude.Include.NON_NULL) + private List<StreamingPreProcess> preProcess; + public List<SegmentPredicate> getPredicates() { return predicates; } @@ -79,21 +94,31 @@ public class DataConnector extends AbstractAuditableEntity { this.predicates = predicates; } + @JsonProperty("pre.proc") + public List<StreamingPreProcess> getPreProcess() { + return CollectionUtils.isEmpty(preProcess) ? null : preProcess; + } + + @JsonProperty("pre.proc") + public void setPreProcess(List<StreamingPreProcess> preProcess) { + this.preProcess = preProcess; + } + @JsonProperty("config") - public Map<String, String> getConfigMap() { + public Map<String, Object> getConfigMap() { return configMap; } @JsonProperty("config") - public void setConfigMap(Map<String, String> configMap) { + public void setConfigMap(Map<String, Object> configMap) { this.configMap = configMap; } - public void setConfig(String config) { + private void setConfig(String config) { this.config = config; } - public String getConfig() { + private String getConfig() { return config; } @@ -137,11 +162,11 @@ public class DataConnector extends AbstractAuditableEntity { this.name = name; } - public String getType() { + public DataType getType() { return type; } - public void setType(String type) { + public void setType(DataType type) { this.type = type; } @@ -163,7 +188,7 @@ public class DataConnector extends AbstractAuditableEntity { @PostLoad public void load() throws IOException { - if (!org.springframework.util.StringUtils.isEmpty(config)) { + if (!StringUtils.isEmpty(config)) { this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, Object>>() { }); } @@ -173,16 +198,16 @@ public class DataConnector extends AbstractAuditableEntity { public DataConnector() { } - public DataConnector(String name, String type, String version, String config) throws IOException { + public DataConnector(String name, DataType type, String version, String config) throws IOException { this.name = name; this.type = type; this.version = version; this.config = config; - this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() { + this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, Object>>() { }); } - public DataConnector(String name, String dataUnit, Map<String,String> configMap, List<SegmentPredicate> predicates) { + public DataConnector(String name, String dataUnit, Map configMap, List<SegmentPredicate> predicates) { this.name = name; this.dataUnit = dataUnit; this.configMap = configMap;
