http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java new file mode 100644 index 0000000..131fe03 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java @@ -0,0 +1,188 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.entity; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.commons.lang.StringUtils; +import org.apache.griffin.core.measure.entity.AbstractAuditableEntity; +import org.apache.griffin.core.util.JsonUtil; +import org.apache.griffin.core.util.PropertiesUtil; +import org.quartz.CronExpression; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Configurable; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.core.io.ClassPathResource; +import org.springframework.stereotype.Component; + +import javax.persistence.*; +import javax.validation.constraints.NotNull; +import java.io.IOException; +import java.util.*; + +@Configurable(preConstruction = true) +@Component +@Entity +public class JobSchedule extends AbstractAuditableEntity { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobSchedule.class); + + @NotNull + private Long measureId; + + @NotNull + private String jobName; + + @NotNull + private String cronExpression; + + @NotNull + private String timeZone; + + @JsonIgnore + @Access(AccessType.PROPERTY) + private String predicateConfig; + + @Transient + private Map<String, Object> configMap = defaultPredicatesConfig(); + + @NotNull + @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}) + @JoinColumn(name = "job_schedule_id") + private List<JobDataSegment> segments = new ArrayList<>(); + + @JsonProperty("measure.id") + public Long getMeasureId() { + return measureId; + } + + @JsonProperty("measure.id") + public void setMeasureId(Long measureId) { + this.measureId = measureId; + } + + @JsonProperty("job.name") + public String getJobName() { + return jobName; + } + + @JsonProperty("job.name") + public void setJobName(String jobName) { + if (StringUtils.isEmpty(jobName)) { + LOGGER.error("Job name cannot be empty."); + throw new NullPointerException(); + } + this.jobName = jobName; + } + + @JsonProperty("cron.expression") + public String getCronExpression() { + return cronExpression; + } + + @JsonProperty("cron.expression") + public void setCronExpression(String cronExpression) { + if (StringUtils.isEmpty(cronExpression) || !isCronExpressionValid(cronExpression)) { + LOGGER.error("Cron expression is invalid.Please check your cron expression."); + throw new IllegalArgumentException(); + } + this.cronExpression = cronExpression; + } + + @JsonProperty("cron.time.zone") + public String getTimeZone() { + return timeZone; + } + + @JsonProperty("cron.time.zone") + public void setTimeZone(String timeZone) { + this.timeZone = timeZone; + } + + @JsonProperty("data.segments") + public List<JobDataSegment> getSegments() { + return segments; + } + + @JsonProperty("data.segments") + public void setSegments(List<JobDataSegment> segments) { + this.segments = segments; + } + + private String getPredicateConfig() { + return predicateConfig; + } + + private void setPredicateConfig(String config) throws IOException { + this.predicateConfig = config; + this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, Object>>() { + }); + } + + @JsonProperty("predicate.config") + public Map<String, Object> getConfigMap() throws IOException { + return configMap; + } + + @JsonProperty("predicate.config") + private void setConfigMap(Map<String, Object> configMap) throws JsonProcessingException { + this.configMap = configMap; + this.predicateConfig = JsonUtil.toJson(configMap); + } + + /** + * @return set default predicate config + * @throws JsonProcessingException json exception + */ + private Map<String, Object> defaultPredicatesConfig() throws JsonProcessingException { + String path = "/application.properties"; + Properties appConf = PropertiesUtil.getProperties(path,new ClassPathResource(path)); + Map<String, Object> scheduleConf = new HashMap<>(); + Map<String, Object> map = new HashMap<>(); + map.put("interval", appConf.getProperty("predicate.job.interval")); + map.put("repeat", appConf.getProperty("predicate.job.repeat.count")); + scheduleConf.put("checkdonefile.schedule", map); + setConfigMap(scheduleConf); + return scheduleConf; + } + + private boolean isCronExpressionValid(String cronExpression) { + if (!CronExpression.isValidExpression(cronExpression)) { + LOGGER.error("Cron expression {} is invalid.", cronExpression); + return false; + } + return true; + } + + public JobSchedule() throws JsonProcessingException { + } + + public JobSchedule(Long measureId, String jobName, String cronExpression, Map configMap, List<JobDataSegment> segments) throws JsonProcessingException { + this.measureId = measureId; + this.jobName = jobName; + this.cronExpression = cronExpression; + setConfigMap(configMap); + this.segments = segments; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/LivyConf.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/LivyConf.java b/service/src/main/java/org/apache/griffin/core/job/entity/LivyConf.java new file mode 100644 index 0000000..208fa8c --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/entity/LivyConf.java @@ -0,0 +1,148 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.entity; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +public class LivyConf implements Serializable { + + private String file; + + private String className; + + private List<String> args; + + private String name; + + private String queue; + + private Long numExecutors; + + private Long executorCores; + + private String driverMemory; + + private String executorMemory; + + private Map<String, String> conf; + + private List<String> jars; + + private List<String> files; + + public String getFile() { + return file; + } + + public void setFile(String file) { + this.file = file; + } + + public String getClassName() { + return className; + } + + public void setClassName(String className) { + this.className = className; + } + + public List<String> getArgs() { + return args; + } + + public void setArgs(List<String> args) { + this.args = args; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public Long getNumExecutors() { + return numExecutors; + } + + public void setNumExecutors(Long numExecutors) { + this.numExecutors = numExecutors; + } + + public Long getExecutorCores() { + return executorCores; + } + + public void setExecutorCores(Long executorCores) { + this.executorCores = executorCores; + } + + public String getDriverMemory() { + return driverMemory; + } + + public void setDriverMemory(String driverMemory) { + this.driverMemory = driverMemory; + } + + public String getExecutorMemory() { + return executorMemory; + } + + public void setExecutorMemory(String executorMemory) { + this.executorMemory = executorMemory; + } + + public Map<String, String> getConf() { + return conf; + } + + public void setConf(Map<String, String> conf) { + this.conf = conf; + } + + public List<String> getJars() { + return jars; + } + + public void setJars(List<String> jars) { + this.jars = jars; + } + + public List<String> getFiles() { + return files; + } + + public void setFiles(List<String> files) { + this.files = files; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java index 773bd98..01e5070 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java @@ -38,10 +38,16 @@ public class LivySessionStates { error, dead, success, - unknown + unknown, + finding, + not_found, + found } - public static SessionState toSessionState(State state) { + private static SessionState toSessionState(State state) { + if (state == null) { + return null; + } switch (state) { case not_started: return new SessionState.NotStarted(); @@ -69,22 +75,17 @@ public class LivySessionStates { } public static boolean isActive(State state) { - if (State.unknown.equals(state)) { - // set unknown isactive() as false. + if (State.unknown.equals(state) || State.finding.equals(state) || State.not_found.equals(state) || State.found.equals(state)) { + // set unknown isActive() as false. return false; } SessionState sessionState = toSessionState(state); - if (sessionState == null) { - return false; - } else { - return sessionState.isActive(); - } + return sessionState != null && sessionState.isActive(); } public static boolean isHealthy(State state) { - if (State.error.equals(state) || State.dead.equals(state) || State.shutting_down.equals(state)) { - return false; - } - return true; + return !(State.error.equals(state) || State.dead.equals(state) || + State.shutting_down.equals(state) || State.finding.equals(state) || + State.not_found.equals(state) || State.found.equals(state)); } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java new file mode 100644 index 0000000..0f5a624 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java @@ -0,0 +1,81 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + + +package org.apache.griffin.core.job.entity; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.commons.lang.StringUtils; +import org.apache.griffin.core.measure.entity.AbstractAuditableEntity; +import org.apache.griffin.core.util.JsonUtil; + +import javax.persistence.Access; +import javax.persistence.AccessType; +import javax.persistence.Entity; +import javax.persistence.Transient; +import java.io.IOException; +import java.util.Map; + +@Entity +public class SegmentPredicate extends AbstractAuditableEntity { + + private String type; + + @JsonIgnore + @Access(AccessType.PROPERTY) + private String config; + + @Transient + private Map<String, String> configMap; + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getConfig() { + return config; + } + + public void setConfig(String config) throws IOException { + this.config = config; + this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() { + }); + } + + @JsonProperty("config") + public Map<String, String> getConfigMap() throws IOException { + return configMap; + } + + @JsonProperty("config") + public void setConfigMap(Map<String, String> configMap) throws JsonProcessingException { + this.configMap = configMap; + this.config = JsonUtil.toJson(configMap); + } + + public SegmentPredicate() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java new file mode 100644 index 0000000..b8ca5cf --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java @@ -0,0 +1,53 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + + +package org.apache.griffin.core.job.entity; + +import org.apache.griffin.core.measure.entity.AbstractAuditableEntity; + +import javax.persistence.Column; +import javax.persistence.Entity; + +@Entity +public class SegmentRange extends AbstractAuditableEntity { + + @Column(name = "data_begin") + private String begin = "1h"; + + private String length = "1h"; + + + public String getBegin() { + return begin; + } + + public void setBegin(String begin) { + this.begin = begin; + } + + public String getLength() { + return length; + } + + public void setLength(String length) { + this.length = length; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java b/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java deleted file mode 100644 index b5925f6..0000000 --- a/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java +++ /dev/null @@ -1,148 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package org.apache.griffin.core.job.entity; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -public class SparkJobDO implements Serializable { - - private String file; - - private String className; - - private List<String> args; - - private String name; - - private String queue; - - private Long numExecutors; - - private Long executorCores; - - private String driverMemory; - - private String executorMemory; - - private Map<String, String> conf; - - private List<String> jars; - - private List<String> files; - - public String getFile() { - return file; - } - - public void setFile(String file) { - this.file = file; - } - - public String getClassName() { - return className; - } - - public void setClassName(String className) { - this.className = className; - } - - public List<String> getArgs() { - return args; - } - - public void setArgs(List<String> args) { - this.args = args; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getQueue() { - return queue; - } - - public void setQueue(String queue) { - this.queue = queue; - } - - public Long getNumExecutors() { - return numExecutors; - } - - public void setNumExecutors(Long numExecutors) { - this.numExecutors = numExecutors; - } - - public Long getExecutorCores() { - return executorCores; - } - - public void setExecutorCores(Long executorCores) { - this.executorCores = executorCores; - } - - public String getDriverMemory() { - return driverMemory; - } - - public void setDriverMemory(String driverMemory) { - this.driverMemory = driverMemory; - } - - public String getExecutorMemory() { - return executorMemory; - } - - public void setExecutorMemory(String executorMemory) { - this.executorMemory = executorMemory; - } - - public Map<String, String> getConf() { - return conf; - } - - public void setConf(Map<String, String> conf) { - this.conf = conf; - } - - public List<String> getJars() { - return jars; - } - - public void setJars(List<String> jars) { - this.jars = jars; - } - - public List<String> getFiles() { - return files; - } - - public void setFiles(List<String> files) { - this.files = files; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java new file mode 100644 index 0000000..ad98603 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java @@ -0,0 +1,34 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.entity; + +import javax.persistence.Entity; + +@Entity +public class VirtualJob extends AbstractJob { + + public VirtualJob() { + super(); + } + + public VirtualJob(String jobName, Long measureId, String metricName) { + super(jobName, measureId, metricName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java b/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java new file mode 100644 index 0000000..cc2ff15 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java @@ -0,0 +1,54 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.factory; + +import org.quartz.spi.TriggerFiredBundle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.config.AutowireCapableBeanFactory; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.scheduling.quartz.SpringBeanJobFactory; + +public final class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory + implements ApplicationContextAware { + private static final Logger LOGGER = LoggerFactory.getLogger(AutowiringSpringBeanJobFactory.class); + + private transient AutowireCapableBeanFactory beanFactory; + + @Override + public void setApplicationContext(final ApplicationContext context) { + beanFactory = context.getAutowireCapableBeanFactory(); + } + + @Override + protected Object createJobInstance(final TriggerFiredBundle bundle) { + + try { + final Object job = super.createJobInstance(bundle); + beanFactory.autowireBean(job); + return job; + + } catch (Exception e) { + LOGGER.error("fail to create job instance. {}", e.getMessage()); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java new file mode 100644 index 0000000..8af39f4 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java @@ -0,0 +1,38 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.factory; + +import org.apache.griffin.core.job.FileExistPredicator; +import org.apache.griffin.core.job.Predicator; +import org.apache.griffin.core.job.entity.SegmentPredicate; + +public class PredicatorFactory { + public static Predicator newPredicateInstance(SegmentPredicate segPredicate) { + Predicator predicate = null; + switch (segPredicate.getType()) { + case "file.exist": + predicate = new FileExistPredicator(segPredicate); + break; + default: + break; + } + return predicate; + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java new file mode 100644 index 0000000..aaaa77d --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java @@ -0,0 +1,25 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.repo; + +import org.apache.griffin.core.job.entity.GriffinJob; + +public interface GriffinJobRepo extends JobRepo<GriffinJob> { +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/JobDataSegmentRepo.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobDataSegmentRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobDataSegmentRepo.java new file mode 100644 index 0000000..48dd3b4 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobDataSegmentRepo.java @@ -0,0 +1,26 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.repo; + +import org.apache.griffin.core.job.entity.JobDataSegment; +import org.springframework.data.repository.CrudRepository; + +public interface JobDataSegmentRepo extends CrudRepository<JobDataSegment, Long> { +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java index 610d282..1714789 100644 --- a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java +++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java @@ -18,45 +18,31 @@ under the License. */ package org.apache.griffin.core.job.repo; -import org.apache.griffin.core.job.entity.JobInstance; -import org.apache.griffin.core.job.entity.LivySessionStates; +import org.apache.griffin.core.job.entity.JobInstanceBean; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; -import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; import java.util.List; +public interface JobInstanceRepo extends CrudRepository<JobInstanceBean, Long> { -@Repository -public interface JobInstanceRepo extends CrudRepository<JobInstance, Long> { - /** - * @param group is group name - * @param name is job name - * @param pageable page info - * @return all job instances scheduled at different time using the same prototype job, - * the prototype job is determined by SCHED_NAME, group name and job name in table QRTZ_JOB_DETAILS. - */ - @Query("select s from JobInstance s " + - "where s.groupName= ?1 and s.jobName=?2 ") - List<JobInstance> findByGroupNameAndJobName(String group, String name, Pageable pageable); + @Query("select DISTINCT s from JobInstanceBean s " + + "where s.state in ('starting', 'not_started', 'recovering', 'idle', 'running', 'busy')") + List<JobInstanceBean> findByActiveState(); - @Query("select s from JobInstance s " + - "where s.groupName= ?1 and s.jobName=?2 ") - List<JobInstance> findByGroupNameAndJobName(String group, String name); + JobInstanceBean findByPredicateName(String name); - @Query("select DISTINCT s.groupName, s.jobName from JobInstance s") - List<Object> findGroupWithJobName(); + @Query("select s from JobInstanceBean s where job_id = ?1") + List<JobInstanceBean> findByJobId(Long jobId, Pageable pageable); - @Modifying - @Query("delete from JobInstance s " + - "where s.groupName= ?1 and s.jobName=?2 ") - void deleteByGroupAndJobName(String groupName, String jobName); + List<JobInstanceBean> findByExpireTmsLessThanEqual(Long expireTms); + @Transactional @Modifying - @Query("update JobInstance s " + - "set s.state= ?2, s.appId= ?3, s.appUri= ?4 where s.id= ?1") - void update(Long id, LivySessionStates.State state, String appId, String appUri); + @Query("delete from JobInstanceBean j where j.expireTms <= ?1") + int deleteByExpireTimestamp(Long expireTms); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java new file mode 100644 index 0000000..a3fcce3 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.repo; + +import org.apache.griffin.core.job.entity.AbstractJob; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.CrudRepository; + +import java.util.List; + +public interface JobRepo<T extends AbstractJob> extends CrudRepository<T, Long> { + + @Query("select count(j) from #{#entityName} j where j.jobName = ?1 and j.deleted = ?2") + int countByJobNameAndDeleted(String jobName, Boolean deleted); + + List<T> findByDeleted(boolean deleted); + + List<T> findByJobNameAndDeleted(String jobName, boolean deleted); + + List<T> findByMeasureIdAndDeleted(Long measureId, boolean deleted); + + T findByIdAndDeleted(Long jobId, boolean deleted); +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java new file mode 100644 index 0000000..1b360e4 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java @@ -0,0 +1,26 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.repo; + +import org.apache.griffin.core.job.entity.JobSchedule; +import org.springframework.data.repository.CrudRepository; + +public interface JobScheduleRepo extends CrudRepository<JobSchedule, Long> { +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/VirtualJobRepo.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/VirtualJobRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/VirtualJobRepo.java new file mode 100644 index 0000000..914a1ff --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/job/repo/VirtualJobRepo.java @@ -0,0 +1,25 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job.repo; + +import org.apache.griffin.core.job.entity.VirtualJob; + +public interface VirtualJobRepo extends JobRepo<VirtualJob> { +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java new file mode 100644 index 0000000..f38982a --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java @@ -0,0 +1,102 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.measure; + +import org.apache.commons.lang.StringUtils; +import org.apache.griffin.core.job.entity.VirtualJob; +import org.apache.griffin.core.job.repo.VirtualJobRepo; +import org.apache.griffin.core.measure.entity.ExternalMeasure; +import org.apache.griffin.core.measure.entity.Measure; +import org.apache.griffin.core.measure.repo.ExternalMeasureRepo; +import org.apache.griffin.core.util.GriffinOperationMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component("externalOperation") +public class ExternalMeasureOperationImpl implements MeasureOperation { + private static final Logger LOGGER = LoggerFactory.getLogger(ExternalMeasureOperationImpl.class); + + @Autowired + private ExternalMeasureRepo measureRepo; + @Autowired + private VirtualJobRepo jobRepo; + + @Override + public GriffinOperationMessage create(Measure measure) { + ExternalMeasure em = (ExternalMeasure) measure; + if (StringUtils.isBlank(em.getMetricName())) { + LOGGER.error("Failed to create external measure {}. Its metric name is blank.", measure.getName()); + return GriffinOperationMessage.CREATE_MEASURE_FAIL; + } + try { + em.setVirtualJob(new VirtualJob()); + em = measureRepo.save(em); + VirtualJob vj = genVirtualJob(em, em.getVirtualJob()); + jobRepo.save(vj); + return GriffinOperationMessage.CREATE_MEASURE_SUCCESS; + } catch (Exception e) { + LOGGER.error("Failed to create new measure {}.{}", em.getName(), e.getMessage()); + } + return GriffinOperationMessage.CREATE_MEASURE_FAIL; + } + + @Override + public GriffinOperationMessage update(Measure measure) { + ExternalMeasure latestMeasure = (ExternalMeasure) measure; + if (StringUtils.isBlank(latestMeasure.getMetricName())) { + LOGGER.error("Failed to create external measure {}. Its metric name is blank.", measure.getName()); + return GriffinOperationMessage.UPDATE_MEASURE_FAIL; + } + try { + ExternalMeasure originMeasure = measureRepo.findOne(latestMeasure.getId()); + VirtualJob vj = genVirtualJob(latestMeasure, originMeasure.getVirtualJob()); + latestMeasure.setVirtualJob(vj); + measureRepo.save(latestMeasure); + return GriffinOperationMessage.UPDATE_MEASURE_SUCCESS; + } catch (Exception e) { + LOGGER.error("Failed to update measure. {}", e.getMessage()); + } + return GriffinOperationMessage.UPDATE_MEASURE_FAIL; + } + + @Override + public Boolean delete(Measure measure) { + try { + ExternalMeasure em = (ExternalMeasure) measure; + em.setDeleted(true); + em.getVirtualJob().setDeleted(true); + measureRepo.save(em); + return true; + } catch (Exception e) { + LOGGER.error("Failed to delete measure. {}", e.getMessage()); + } + return false; + + } + + private VirtualJob genVirtualJob(ExternalMeasure em, VirtualJob vj) { + vj.setMeasureId(em.getId()); + vj.setJobName(em.getName()); + vj.setMetricName(em.getMetricName()); + return vj; + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java new file mode 100644 index 0000000..88c5409 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java @@ -0,0 +1,114 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.measure; + +import org.apache.commons.lang.StringUtils; +import org.apache.griffin.core.job.JobServiceImpl; +import org.apache.griffin.core.measure.entity.DataConnector; +import org.apache.griffin.core.measure.entity.DataSource; +import org.apache.griffin.core.measure.entity.GriffinMeasure; +import org.apache.griffin.core.measure.entity.Measure; +import org.apache.griffin.core.measure.repo.DataConnectorRepo; +import org.apache.griffin.core.measure.repo.MeasureRepo; +import org.apache.griffin.core.util.GriffinOperationMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.List; + +@Component("griffinOperation") +public class GriffinMeasureOperationImpl implements MeasureOperation { + private static final Logger LOGGER = LoggerFactory.getLogger(GriffinMeasureOperationImpl.class); + + @Autowired + private MeasureRepo<Measure> measureRepo; + @Autowired + private DataConnectorRepo dcRepo; + @Autowired + private JobServiceImpl jobService; + + + @Override + public GriffinOperationMessage create(Measure measure) { + if (!isConnectorNamesValid((GriffinMeasure) measure)) { + return GriffinOperationMessage.CREATE_MEASURE_FAIL; + } + try { + measureRepo.save(measure); + return GriffinOperationMessage.CREATE_MEASURE_SUCCESS; + } catch (Exception e) { + LOGGER.error("Failed to create new measure {}.", measure.getName(), e); + } + return GriffinOperationMessage.CREATE_MEASURE_FAIL; + } + + @Override + public GriffinOperationMessage update(Measure measure) { + try { + measureRepo.save(measure); + return GriffinOperationMessage.UPDATE_MEASURE_SUCCESS; + } catch (Exception e) { + LOGGER.error("Failed to update measure. {}", e.getMessage()); + } + return GriffinOperationMessage.UPDATE_MEASURE_FAIL; + } + + @Override + public Boolean delete(Measure measure) { + boolean pauseStatus = jobService.deleteJobsRelateToMeasure(measure.getId()); + if (!pauseStatus) { + return false; + } + measure.setDeleted(true); + measureRepo.save(measure); + return true; + } + + private boolean isConnectorNamesValid(GriffinMeasure measure) { + List<String> names = getConnectorNames(measure); + if (names.size() == 0) { + LOGGER.warn("Connector names cannot be empty."); + return false; + } + List<DataConnector> connectors = dcRepo.findByConnectorNames(names); + if (!CollectionUtils.isEmpty(connectors)) { + LOGGER.warn("Failed to create new measure {}. It's connector names already exist. ", measure.getName()); + return false; + } + return true; + } + + private List<String> getConnectorNames(GriffinMeasure measure) { + List<String> names = new ArrayList<>(); + for (DataSource source : measure.getDataSources()) { + for (DataConnector dc : source.getConnectors()) { + String name = dc.getName(); + if (!StringUtils.isEmpty(name)) { + names.add(name); + } + } + } + return names; + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java index fae0169..3b557ca 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java @@ -37,17 +37,17 @@ public class MeasureController { return measureService.getAllAliveMeasures(); } - @RequestMapping(value = "/measure/{id}", method = RequestMethod.GET) + @RequestMapping(value = "/measures/{id}", method = RequestMethod.GET) public Measure getMeasureById(@PathVariable("id") long id) { return measureService.getMeasureById(id); } - @RequestMapping(value = "/measure/{id}", method = RequestMethod.DELETE) + @RequestMapping(value = "/measures/{id}", method = RequestMethod.DELETE) public GriffinOperationMessage deleteMeasureById(@PathVariable("id") Long id) { return measureService.deleteMeasureById(id); } - @RequestMapping(value = "/measure", method = RequestMethod.PUT) + @RequestMapping(value = "/measures", method = RequestMethod.PUT) public GriffinOperationMessage updateMeasure(@RequestBody Measure measure) { return measureService.updateMeasure(measure); } @@ -57,7 +57,7 @@ public class MeasureController { return measureService.getAliveMeasuresByOwner(owner); } - @RequestMapping(value = "/measure", method = RequestMethod.POST) + @RequestMapping(value = "/measures", method = RequestMethod.POST) public GriffinOperationMessage createMeasure(@RequestBody Measure measure) { return measureService.createMeasure(measure); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java new file mode 100644 index 0000000..80f1f30 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java @@ -0,0 +1,34 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.measure; + + +import org.apache.griffin.core.measure.entity.Measure; +import org.apache.griffin.core.util.GriffinOperationMessage; + +public interface MeasureOperation { + + GriffinOperationMessage create(Measure measure); + + GriffinOperationMessage update(Measure measure); + + Boolean delete(Measure measure); + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java index abe36d9..499ee8e 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java @@ -19,14 +19,12 @@ under the License. package org.apache.griffin.core.measure; -import org.apache.griffin.core.job.JobService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; -import java.io.Serializable; import java.util.List; import java.util.Map; @@ -37,9 +35,6 @@ public class MeasureOrgController { @Autowired private MeasureOrgService measureOrgService; - @Autowired - private JobService jobService; - @RequestMapping(value = "/org", method = RequestMethod.GET) public List<String> getOrgs() { return measureOrgService.getOrgs(); @@ -59,9 +54,4 @@ public class MeasureOrgController { public Map<String, List<String>> getMeasureNamesGroupByOrg() { return measureOrgService.getMeasureNamesGroupByOrg(); } - - @RequestMapping(value = "/org/measure/jobs", method = RequestMethod.GET) - public Map<String, Map<String, List<Map<String, Serializable>>>> getMeasureWithJobsGroupByOrg() { - return measureOrgService.getMeasureWithJobDetailsGroupByOrg(jobService.getJobDetailsGroupByMeasureId()); - } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java index f45c636..754f3d1 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java @@ -31,5 +31,5 @@ public interface MeasureOrgService { Map<String, List<String>> getMeasureNamesGroupByOrg(); - Map<String, Map<String, List<Map<String, Serializable>>>> getMeasureWithJobDetailsGroupByOrg(Map<String, List<Map<String, Serializable>>> jobDetailsGroupByMeasure); + Map<String, Map<String, List<Map<String, Object>>>> getMeasureWithJobDetailsGroupByOrg(Map<String, List<Map<String, Object>>> jobDetailsGroupByMeasure); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java index d4cb6a9..1d64830 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java @@ -19,12 +19,12 @@ under the License. package org.apache.griffin.core.measure; +import org.apache.griffin.core.measure.entity.GriffinMeasure; import org.apache.griffin.core.measure.entity.Measure; -import org.apache.griffin.core.measure.repo.MeasureRepo; +import org.apache.griffin.core.measure.repo.GriffinMeasureRepo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -34,7 +34,7 @@ import java.util.Map; public class MeasureOrgServiceImpl implements MeasureOrgService { @Autowired - private MeasureRepo measureRepo; + private GriffinMeasureRepo measureRepo; @Override public List<String> getOrgs() { @@ -43,20 +43,20 @@ public class MeasureOrgServiceImpl implements MeasureOrgService { @Override public List<String> getMetricNameListByOrg(String org) { - return measureRepo.findNameByOrganization(org,false); + return measureRepo.findNameByOrganization(org, false); } @Override public Map<String, List<String>> getMeasureNamesGroupByOrg() { Map<String, List<String>> orgWithMetricsMap = new HashMap<>(); - List<Measure> measures = measureRepo.findByDeleted(false); + List<GriffinMeasure> measures = measureRepo.findByDeleted(false); if (measures == null) { return null; } for (Measure measure : measures) { String orgName = measure.getOrganization(); String measureName = measure.getName(); - List<String> measureList = orgWithMetricsMap.getOrDefault(orgName, new ArrayList<String>()); + List<String> measureList = orgWithMetricsMap.getOrDefault(orgName, new ArrayList<>()); measureList.add(measureName); orgWithMetricsMap.put(orgName, measureList); } @@ -64,9 +64,9 @@ public class MeasureOrgServiceImpl implements MeasureOrgService { } @Override - public Map<String, Map<String, List<Map<String, Serializable>>>> getMeasureWithJobDetailsGroupByOrg(Map<String, List<Map<String, Serializable>>> jobDetails) { - Map<String, Map<String, List<Map<String, Serializable>>>> result = new HashMap<>(); - List<Measure> measures = measureRepo.findByDeleted(false); + public Map<String, Map<String, List<Map<String, Object>>>> getMeasureWithJobDetailsGroupByOrg(Map<String, List<Map<String, Object>>> jobDetails) { + Map<String, Map<String, List<Map<String, Object>>>> result = new HashMap<>(); + List<GriffinMeasure> measures = measureRepo.findByDeleted(false); if (measures == null) { return null; } @@ -74,8 +74,8 @@ public class MeasureOrgServiceImpl implements MeasureOrgService { String orgName = measure.getOrganization(); String measureName = measure.getName(); String measureId = measure.getId().toString(); - List<Map<String, Serializable>> jobList = jobDetails.getOrDefault(measureId, new ArrayList<>()); - Map<String, List<Map<String, Serializable>>> measureWithJobs = result.getOrDefault(orgName, new HashMap<>()); + List<Map<String, Object>> jobList = jobDetails.getOrDefault(measureId, new ArrayList<>()); + Map<String, List<Map<String, Object>>> measureWithJobs = result.getOrDefault(orgName, new HashMap<>()); measureWithJobs.put(measureName, jobList); result.put(orgName, measureWithJobs); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java index 0e20b4f..a330d0a 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java @@ -24,11 +24,10 @@ import org.apache.griffin.core.measure.entity.Measure; import org.apache.griffin.core.util.GriffinOperationMessage; import java.util.List; -import java.util.Map; public interface MeasureService { - Iterable<Measure> getAllAliveMeasures(); + List<Measure> getAllAliveMeasures(); Measure getMeasureById(long id); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java index 8c088c8..ecb9fdd 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java @@ -20,17 +20,16 @@ under the License. package org.apache.griffin.core.measure; -import org.apache.griffin.core.job.JobServiceImpl; +import org.apache.griffin.core.measure.entity.GriffinMeasure; import org.apache.griffin.core.measure.entity.Measure; import org.apache.griffin.core.measure.repo.MeasureRepo; import org.apache.griffin.core.util.GriffinOperationMessage; -import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.util.CollectionUtils; import java.util.List; @@ -39,79 +38,76 @@ public class MeasureServiceImpl implements MeasureService { private static final Logger LOGGER = LoggerFactory.getLogger(MeasureServiceImpl.class); @Autowired - private JobServiceImpl jobService; + private MeasureRepo<Measure> measureRepo; @Autowired - private MeasureRepo measureRepo; + @Qualifier("griffinOperation") + private MeasureOperation griffinOp; + @Autowired + @Qualifier("externalOperation") + private MeasureOperation externalOp; @Override - public Iterable<Measure> getAllAliveMeasures() { + public List<Measure> getAllAliveMeasures() { return measureRepo.findByDeleted(false); } @Override - public Measure getMeasureById(@PathVariable("id") long id) { + public Measure getMeasureById(long id) { return measureRepo.findByIdAndDeleted(id, false); } @Override - public GriffinOperationMessage deleteMeasureById(Long measureId) { - if (!measureRepo.exists(measureId)) { - return GriffinOperationMessage.RESOURCE_NOT_FOUND; - } else { - Measure measure = measureRepo.findOne(measureId); - try { - //pause all jobs related to the measure - jobService.deleteJobsRelateToMeasure(measure); - measure.setDeleted(true); - measureRepo.save(measure); - } catch (SchedulerException e) { - LOGGER.error("Delete measure id: {} name: {} failure. {}", measure.getId(), measure.getName(), e.getMessage()); - return GriffinOperationMessage.DELETE_MEASURE_BY_ID_FAIL; - } - - return GriffinOperationMessage.DELETE_MEASURE_BY_ID_SUCCESS; - } + public List<Measure> getAliveMeasuresByOwner(String owner) { + return measureRepo.findByOwnerAndDeleted(owner, false); } @Override public GriffinOperationMessage createMeasure(Measure measure) { List<Measure> aliveMeasureList = measureRepo.findByNameAndDeleted(measure.getName(), false); - if (aliveMeasureList.size() == 0) { - try { - if (measureRepo.save(measure) != null) { - return GriffinOperationMessage.CREATE_MEASURE_SUCCESS; - } else { - return GriffinOperationMessage.CREATE_MEASURE_FAIL; - } - } catch (Exception e) { - LOGGER.info("Failed to create new measure {}.{}", measure.getName(), e.getMessage()); - return GriffinOperationMessage.CREATE_MEASURE_FAIL; - } - - } else { - LOGGER.info("Failed to create new measure {}, it already exists.", measure.getName()); + if (!CollectionUtils.isEmpty(aliveMeasureList)) { + LOGGER.warn("Failed to create new measure {}, it already exists.", measure.getName()); return GriffinOperationMessage.CREATE_MEASURE_FAIL_DUPLICATE; } + MeasureOperation op = getOperation(measure); + return op.create(measure); } @Override - public List<Measure> getAliveMeasuresByOwner(String owner) { - return measureRepo.findByOwnerAndDeleted(owner, false); + public GriffinOperationMessage updateMeasure(Measure measure) { + Measure m = measureRepo.findByIdAndDeleted(measure.getId(), false); + if (m == null) { + return GriffinOperationMessage.RESOURCE_NOT_FOUND; + } + if (!m.getType().equals(measure.getType())) { + LOGGER.error("Can't update measure to different type."); + return GriffinOperationMessage.UPDATE_MEASURE_FAIL; + } + MeasureOperation op = getOperation(measure); + return op.update(measure); } @Override - public GriffinOperationMessage updateMeasure(@RequestBody Measure measure) { - if (measureRepo.findByIdAndDeleted(measure.getId(), false) == null) { + public GriffinOperationMessage deleteMeasureById(Long measureId) { + Measure measure = measureRepo.findByIdAndDeleted(measureId, false); + if (measure == null) { return GriffinOperationMessage.RESOURCE_NOT_FOUND; - } else { - try { - measureRepo.save(measure); - } catch (Exception e) { - LOGGER.error("Failed to update measure. {}", e.getMessage()); - return GriffinOperationMessage.UPDATE_MEASURE_FAIL; + } + try { + MeasureOperation op = getOperation(measure); + if (op.delete(measure)) { + return GriffinOperationMessage.DELETE_MEASURE_BY_ID_SUCCESS; } + } catch (Exception e) { + LOGGER.error("Delete measure id: {} name: {} failure. {}", measure.getId(), measure.getName(), e.getMessage()); + } + return GriffinOperationMessage.DELETE_MEASURE_BY_ID_FAIL; + } - return GriffinOperationMessage.UPDATE_MEASURE_SUCCESS; + private MeasureOperation getOperation(Measure measure) { + if (measure instanceof GriffinMeasure) { + return griffinOp; } + return externalOp; } + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java index a36c240..3c4abf5 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java @@ -20,16 +20,21 @@ under the License. package org.apache.griffin.core.measure.entity; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; -import org.apache.commons.lang.StringUtils; +import org.apache.griffin.core.job.entity.SegmentPredicate; import org.apache.griffin.core.util.JsonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; -import javax.persistence.Entity; -import javax.persistence.Transient; +import javax.persistence.*; +import javax.validation.constraints.NotNull; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; @Entity @@ -38,30 +43,84 @@ public class DataConnector extends AbstractAuditableEntity { private final static Logger LOGGER = LoggerFactory.getLogger(DataConnector.class); + @NotNull + private String name; + private String type; private String version; - private String config; + @JsonInclude(JsonInclude.Include.NON_NULL) + private String dataUnit; @JsonIgnore @Transient - private Map<String, String> configInMaps; + private String defaultDataUnit = "365000d"; + + @JsonIgnore + @Access(AccessType.PROPERTY) + private String config; + + @Transient + private Map<String, String> configMap; + + @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}) + @JoinColumn(name = "data_connector_id") + private List<SegmentPredicate> predicates = new ArrayList<>(); + + public List<SegmentPredicate> getPredicates() { + return predicates; + } + + public void setPredicates(List<SegmentPredicate> predicates) { + this.predicates = predicates; + } + + @JsonProperty("config") + public Map<String, String> getConfigMap() throws IOException { + return configMap; + } + + @JsonProperty("config") + public void setConfigMap(Map<String, String> configMap) throws JsonProcessingException { + this.configMap = configMap; + this.config = JsonUtil.toJson(configMap); + } - public Map<String, String> getConfigInMaps() throws IOException { - if (this.configInMaps == null && !StringUtils.isEmpty(config)) { - this.configInMaps = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() {}); + public void setConfig(String config) throws IOException { + this.config = config; + this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() { + }); + } + + public String getConfig() throws IOException { + return config; + } + + + @JsonProperty("data.unit") + public String getDataUnit() { + if (dataUnit != null) { + return dataUnit; } - return configInMaps; + return defaultDataUnit; + } + + @JsonProperty("data.unit") + public void setDataUnit(String dataUnit) { + this.dataUnit = dataUnit; } - public void setConfig(Map<String, String> configInMaps) throws JsonProcessingException { - this.configInMaps = configInMaps; - this.config = JsonUtil.toJson(configInMaps); + public String getName() { + return name; } - public Map<String, String> getConfig() throws IOException { - return getConfigInMaps(); + public void setName(String name) { + if (StringUtils.isEmpty(name)) { + LOGGER.error("Connector name cannot be empty."); + throw new NullPointerException(); + } + this.name = name; } public String getType() { @@ -84,22 +143,19 @@ public class DataConnector extends AbstractAuditableEntity { public DataConnector() { } - public DataConnector(String type, String version, String config) { + public DataConnector(String name, String type, String version, String config) throws IOException { + this.name = name; this.type = type; this.version = version; this.config = config; - TypeReference<Map<String, String>> mapType = new TypeReference<Map<String, String>>() { - }; - try { - this.configInMaps = JsonUtil.toEntity(config, mapType); - } catch (IOException e) { - LOGGER.error("Error in converting json to map. {}", e.getMessage()); - } + this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() { + }); } @Override public String toString() { return "DataConnector{" + + "name=" + name + "type=" + type + ", version='" + version + '\'' + ", config=" + config + http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java index 14619cb..0466992 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java @@ -20,7 +20,11 @@ under the License. package org.apache.griffin.core.measure.entity; + +import org.springframework.util.CollectionUtils; + import javax.persistence.*; +import java.util.ArrayList; import java.util.List; @Entity @@ -30,8 +34,8 @@ public class DataSource extends AbstractAuditableEntity { private String name; @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}) - @JoinColumn(name = "dataSource_id") - private List<DataConnector> connectors; + @JoinColumn(name = "data_source_id") + private List<DataConnector> connectors = new ArrayList<>(); public String getName() { return name; @@ -46,6 +50,9 @@ public class DataSource extends AbstractAuditableEntity { } public void setConnectors(List<DataConnector> connectors) { + if (CollectionUtils.isEmpty(connectors)) { + throw new NullPointerException("Data connector can not be empty."); + } this.connectors = connectors; } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java index 2a70636..75a39ce 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java @@ -24,6 +24,7 @@ import org.hibernate.annotations.Fetch; import org.hibernate.annotations.FetchMode; import javax.persistence.*; +import java.util.ArrayList; import java.util.List; @@ -34,7 +35,7 @@ public class EvaluateRule extends AbstractAuditableEntity { @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}) @JoinColumn(name = "evaluateRule_id") @Fetch(FetchMode.SUBSELECT) - private List<Rule> rules; + private List<Rule> rules = new ArrayList<>(); public List<Rule> getRules() { return rules; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java new file mode 100644 index 0000000..eb4a19d --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java @@ -0,0 +1,71 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.measure.entity; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.griffin.core.job.entity.VirtualJob; + +import javax.persistence.CascadeType; +import javax.persistence.Entity; +import javax.persistence.FetchType; +import javax.persistence.OneToOne; + +/** + * Measures to publish metrics that processed externally + */ +@Entity +public class ExternalMeasure extends Measure { + + private String metricName; + + @JsonIgnore + @OneToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL) + private VirtualJob virtualJob; + + public ExternalMeasure() { + super(); + } + + public ExternalMeasure(String name, String description, String organization, String owner, String metricName) { + super(name, description, organization, owner); + this.metricName = metricName; + } + + public String getMetricName() { + return metricName; + } + + public void setMetricName(String metricName) { + this.metricName = metricName; + } + + public VirtualJob getVirtualJob() { + return virtualJob; + } + + public void setVirtualJob(VirtualJob virtualJob) { + this.virtualJob = virtualJob; + } + + @Override + public String getType() { + return "external"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java new file mode 100644 index 0000000..3c5c602 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java @@ -0,0 +1,115 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.measure.entity; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.collections.CollectionUtils; + +import javax.persistence.*; +import javax.validation.constraints.NotNull; +import java.util.ArrayList; +import java.util.List; + +/** + * Measures processed on Griffin + */ +@Entity +public class GriffinMeasure extends Measure { + + private String processType; + + @Transient + @JsonInclude(JsonInclude.Include.NON_NULL) + private Long timestamp; + + + @NotNull + @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}) + @JoinColumn(name = "measure_id") + private List<DataSource> dataSources = new ArrayList<>(); + + @NotNull + @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}) + @JoinColumn(name = "evaluate_rule_id") + private EvaluateRule evaluateRule; + + @JsonProperty("process.type") + public String getProcessType() { + return processType; + } + + @JsonProperty("process.type") + public void setProcessType(String processType) { + this.processType = processType; + } + + @JsonProperty("data.sources") + public List<DataSource> getDataSources() { + return dataSources; + } + + @JsonProperty("data.sources") + public void setDataSources(List<DataSource> dataSources) { + if (CollectionUtils.isEmpty(dataSources)) { + throw new NullPointerException("Data source can not be empty."); + } + this.dataSources = dataSources; + } + + @JsonProperty("evaluate.rule") + public EvaluateRule getEvaluateRule() { + return evaluateRule; + } + + @JsonProperty("evaluate.rule") + public void setEvaluateRule(EvaluateRule evaluateRule) { + if (evaluateRule == null || CollectionUtils.isEmpty(evaluateRule.getRules())) { + throw new NullPointerException("Evaluate rule can not be empty."); + } + this.evaluateRule = evaluateRule; + } + + public Long getTimestamp() { + return timestamp; + } + + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + + @Override + public String getType() { + return "griffin"; + } + + public GriffinMeasure() { + super(); + } + + public GriffinMeasure(Long measureId,String name, String description, String organization, String processType, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) { + super(name, description, organization, owner); + this.setId(measureId); + this.processType = processType; + this.dataSources = dataSources; + this.evaluateRule = evaluateRule; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java index d8afba4..cf2daec 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java @@ -19,39 +19,31 @@ under the License. package org.apache.griffin.core.measure.entity; -import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; -import javax.persistence.*; -import java.util.List; +import javax.persistence.Entity; +import javax.persistence.Inheritance; +import javax.persistence.InheritanceType; +import javax.validation.constraints.NotNull; @Entity -public class Measure extends AbstractAuditableEntity { +@Inheritance(strategy = InheritanceType.JOINED) +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes({@JsonSubTypes.Type(value = GriffinMeasure.class, name = "griffin"), @JsonSubTypes.Type(value = ExternalMeasure.class, name = "external")}) +public abstract class Measure extends AbstractAuditableEntity { private static final long serialVersionUID = -4748881017029815714L; - private String name; + @NotNull + protected String name; - private String description; + protected String description; - private String organization; + protected String organization; - private String processType; + protected String owner; - /** - * record triggered time of measure - */ - private Long triggerTimeStamp = -1L; - - - @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}) - @JoinColumn(name = "measure_id") - private List<DataSource> dataSources; - - @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}) - @JoinColumn(name = "evaluateRule_id") - private EvaluateRule evaluateRule; - - private String owner; - private Boolean deleted = false; + protected Boolean deleted = false; public String getName() { return name; @@ -85,34 +77,6 @@ public class Measure extends AbstractAuditableEntity { this.owner = owner; } - @JsonProperty("process.type") - public String getProcessType() { - return processType; - } - - @JsonProperty("process.type") - public void setProcessType(String processType) { - this.processType = processType; - } - - @JsonProperty("data.sources") - public List<DataSource> getDataSources() { - return dataSources; - } - - @JsonProperty("data.sources") - public void setDataSources(List<DataSource> dataSources) { - this.dataSources = dataSources; - } - - public EvaluateRule getEvaluateRule() { - return evaluateRule; - } - - public void setEvaluateRule(EvaluateRule evaluateRule) { - this.evaluateRule = evaluateRule; - } - public Boolean getDeleted() { return this.deleted; } @@ -121,26 +85,15 @@ public class Measure extends AbstractAuditableEntity { this.deleted = deleted; } - @JsonProperty("timestamp") - public Long getTriggerTimeStamp() { - return triggerTimeStamp; - } - - @JsonProperty("timestamp") - public void setTriggerTimeStamp(Long triggerTimeStamp) { - this.triggerTimeStamp = triggerTimeStamp; - } - public Measure() { } - public Measure(String name, String description, String organization, String processType, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) { + public Measure(String name, String description, String organization, String owner) { this.name = name; this.description = description; this.organization = organization; - this.processType = processType; this.owner = owner; - this.dataSources = dataSources; - this.evaluateRule = evaluateRule; } + + public abstract String getType(); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java index b060bc4..f0c6516 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java @@ -23,12 +23,9 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; -import org.apache.commons.lang.StringUtils; import org.apache.griffin.core.util.JsonUtil; -import javax.persistence.Column; -import javax.persistence.Entity; -import javax.persistence.Transient; +import javax.persistence.*; import java.io.IOException; import java.util.Map; @@ -43,10 +40,18 @@ public class Rule extends AbstractAuditableEntity { private String dqType; - @Column(length = 1024) + @Column(length = 10 * 1024) private String rule; @JsonIgnore + private String name; + + @JsonIgnore + private String description; + + @JsonIgnore + @Access(AccessType.PROPERTY) + @Column(length = 10 * 1024) private String details; @Transient @@ -86,15 +91,14 @@ public class Rule extends AbstractAuditableEntity { return details; } - public void setDetails(String details) { + private void setDetails(String details) throws IOException { this.details = details; + detailsMap = JsonUtil.toEntity(details, new TypeReference<Map<String, Object>>() { + }); } @JsonProperty("details") - public Map<String, Object> getDetailsMap() throws IOException { - if (detailsMap == null && !StringUtils.isEmpty(details)) { - detailsMap = JsonUtil.toEntity(details, new TypeReference<Map<String, Object>>() {}); - } + public Map<String, Object> getDetailsMap() { return detailsMap; } @@ -104,6 +108,22 @@ public class Rule extends AbstractAuditableEntity { this.details = JsonUtil.toJson(details); } + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + public Rule() { }
