change json format,update unit test and fix hive connect Author: Lionel Liu <[email protected]> Author: Yao <[email protected]>
Closes #126 from ahutsunshine/master. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/43f9dbf7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/43f9dbf7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/43f9dbf7 Branch: refs/heads/master Commit: 43f9dbf7bb41a35d94e695ca23748eb67689fc80 Parents: 0a3de75 Author: Lionel Liu <[email protected]> Authored: Mon Oct 9 15:12:30 2017 +0800 Committer: Lionel Liu <[email protected]> Committed: Mon Oct 9 15:12:30 2017 +0800 ---------------------------------------------------------------------- .../griffin/core/GriffinWebApplication.java | 44 +--- .../griffin/core/common/SimpleCORSFilter.java | 7 +- .../griffin/core/config/SwaggerConfig.java | 1 + .../AutowiringSpringBeanJobFactory.java | 34 +-- .../core/config/jobConfig/SparkJobConfig.java | 3 - .../core/error/exception/GriffinException.java | 32 +-- .../exception/RuntimeExceptionHandler.java | 50 ++-- .../apache/griffin/core/job/JobController.java | 20 +- .../org/apache/griffin/core/job/JobService.java | 2 +- .../apache/griffin/core/job/JobServiceImpl.java | 209 ++++++++------- .../apache/griffin/core/job/SparkSubmitJob.java | 157 ++++++----- .../griffin/core/job/entity/JobInstance.java | 11 +- .../griffin/core/job/entity/JobRequestBody.java | 3 - .../core/job/entity/LivySessionStates.java | 21 +- .../griffin/core/job/entity/SparkJobDO.java | 60 ++--- .../griffin/core/job/repo/JobInstanceRepo.java | 16 +- .../griffin/core/login/LoginController.java | 7 +- .../apache/griffin/core/login/LoginService.java | 8 +- .../griffin/core/login/LoginServiceImpl.java | 44 ++-- .../griffin/core/measure/MeasureController.java | 14 +- .../core/measure/MeasureServiceImpl.java | 38 +-- .../core/measure/entity/AuditableEntity.java | 20 +- .../core/measure/entity/DataConnector.java | 70 ++--- .../griffin/core/measure/entity/DataSource.java | 59 +++++ .../core/measure/entity/EvaluateRule.java | 41 ++- .../griffin/core/measure/entity/Measure.java | 76 ++---- .../griffin/core/measure/entity/Rule.java | 75 ++++++ .../core/measure/repo/DataConnectorRepo.java | 2 - .../core/measure/repo/DataSourceRepo.java | 26 ++ .../core/measure/repo/EvaluateRuleRepo.java | 2 - .../griffin/core/measure/repo/MeasureRepo.java | 19 +- .../griffin/core/measure/repo/RuleRepo.java | 26 ++ .../metastore/hive/HiveMetaStoreController.java | 73 ++++++ .../core/metastore/hive/HiveMetaStoreProxy.java | 78 ++++++ .../metastore/hive/HiveMetaStoreService.java | 39 +++ .../hive/HiveMetaStoreServiceImpl.java | 162 ++++++++++++ .../metastore/hive/HiveMetastoreController.java | 60 ----- .../core/metastore/hive/HiveMetastoreProxy.java | 78 ------ .../metastore/hive/HiveMetastoreService.java | 39 --- .../hive/HiveMetastoreServiceImpl.java | 154 ----------- .../metastore/kafka/KafkaSchemaController.java | 14 +- .../metastore/kafka/KafkaSchemaServiceImpl.java | 2 +- .../griffin/core/metric/MetricController.java | 6 +- .../griffin/core/metric/MetricServiceImpl.java | 5 +- .../griffin/core/service/GriffinController.java | 57 ++-- .../core/util/GriffinOperationMessage.java | 27 +- .../apache/griffin/core/util/GriffinUtil.java | 39 ++- .../griffin/core/job/JobControllerTest.java | 75 +++--- .../griffin/core/job/JobInstanceRepoTest.java | 97 ++++--- .../griffin/core/job/JobServiceImplTest.java | 227 ++++++++-------- .../griffin/core/job/SparkSubmitJobTest.java | 213 +++------------ .../core/measure/MeasureControllerTest.java | 152 ++++++----- .../griffin/core/measure/MeasureRepoTest.java | 231 ++++++++--------- .../core/measure/MeasureServiceImplTest.java | 216 ++++++---------- .../griffin/core/measure/MeasureTestHelper.java | 55 ++++ .../core/measure/repo/MeasureRepoTest.java | 41 +-- .../hive/HiveMetaStoreControllerTest.java | 128 +++++++++ .../hive/HiveMetaStoreServiceImplTest.java | 145 +++++++++++ .../hive/HiveMetastoreControllerTest.java | 101 -------- .../hive/HiveMetastoreServiceImplTest.java | 167 ------------ .../kafka/KafkaSchemaControllerTest.java | 220 ++++++++-------- .../kafka/KafkaSchemaServiceImplTest.java | 257 +++++++++---------- .../core/metric/MetricControllerTest.java | 22 +- .../core/metric/MetricServiceImplTest.java | 23 +- .../core/service/GriffinControllerTest.java | 104 ++------ .../griffin/core/util/GriffinUtilTest.java | 6 +- service/src/test/resources/Init_quartz-h2.sql | 1 - service/src/test/resources/sparkJob.properties | 52 ---- service/src/test/resources/test.sql | 88 +++---- 69 files changed, 2218 insertions(+), 2433 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java b/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java index 69d7d21..e5f877b 100644 --- a/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java +++ b/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java @@ -27,57 +27,17 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.EnableScheduling; -//import org.apache.griffin.core.measure.repo.ConnectorConfigRepo; @SpringBootApplication @EnableScheduling -public class GriffinWebApplication/* implements CommandLineRunner*/{ +public class GriffinWebApplication { private static final Logger LOGGER = LoggerFactory.getLogger(GriffinWebApplication.class); + public static void main(String[] args) { LOGGER.info("application start"); SpringApplication.run(GriffinWebApplication.class, args); } -// @Autowired -// MeasureRepo measureRepo; -// @Autowired -// EvaluateRuleRepo evaluateRuleRepo; -// @Autowired -// DataConnectorRepo connectorRepo; -// -// public void run(String... strings) throws Exception { -// HashMap<String,String> configMap1=new HashMap<>(); -// configMap1.put("database","default"); -// configMap1.put("table.name","test_data_src"); -// HashMap<String,String> configMap2=new HashMap<>(); -// configMap2.put("database","default"); -// configMap2.put("table.name","test_data_tgt"); -// String configJson1 = new ObjectMapper().writeValueAsString(configMap1); -// String configJson2 = new ObjectMapper().writeValueAsString(configMap2); -// -// DataConnector source = new DataConnector(ConnectorType.HIVE, "1.2", configJson1); -// DataConnector target = new DataConnector(ConnectorType.HIVE, "1.2", configJson2); -// -// String rules = "$source.uage > 100 AND $source.uid = $target.uid AND $source.uage + 12 = $target.uage + 10 + 2 AND $source.udes + 11 = $target.udes + 1 + 1"; -// -// EvaluateRule eRule = new EvaluateRule(1,rules); -// -// Measure measure = new Measure("viewitem_hourly","bevssoj description", Measure.MearuseType.accuracy, "bullseye", source, target, eRule,"test1"); -// measureRepo.save(measure); -// -// DataConnector source2 = new DataConnector(ConnectorType.HIVE, "1.2", configJson1); -// DataConnector target2 = new DataConnector(ConnectorType.HIVE, "1.2", configJson2); -// EvaluateRule eRule2 = new EvaluateRule(1,rules); -// Measure measure2 = new Measure("search_hourly","test description", Measure.MearuseType.accuracy, "bullseye", source2, target2, eRule2,"test1"); -// measureRepo.save(measure2); -// -// DataConnector source3 = new DataConnector(ConnectorType.HIVE, "1.2", configJson1); -// DataConnector target3 = new DataConnector(ConnectorType.HIVE, "1.2", configJson2); -// EvaluateRule eRule3 = new EvaluateRule(1,rules); -// Measure measure3 = new Measure("buy_hourly","test_just_inthere description", Measure.MearuseType.accuracy, "bullseye", source3, target3, eRule3,"test1"); -// measureRepo.save(measure3); -// } - @Bean public SimpleCORSFilter simpleFilter() { return new SimpleCORSFilter(); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/common/SimpleCORSFilter.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/common/SimpleCORSFilter.java b/service/src/main/java/org/apache/griffin/core/common/SimpleCORSFilter.java index b7946a3..a5cd687 100644 --- a/service/src/main/java/org/apache/griffin/core/common/SimpleCORSFilter.java +++ b/service/src/main/java/org/apache/griffin/core/common/SimpleCORSFilter.java @@ -17,6 +17,7 @@ specific language governing permissions and limitations under the License. */ package org.apache.griffin.core.common; + import javax.servlet.*; import javax.servlet.http.HttpServletResponse; import java.io.IOException; @@ -33,10 +34,12 @@ public class SimpleCORSFilter implements Filter { } @Override - public void init(FilterConfig filterConfig) {} + public void init(FilterConfig filterConfig) { + } @Override - public void destroy() {} + public void destroy() { + } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/config/SwaggerConfig.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/config/SwaggerConfig.java b/service/src/main/java/org/apache/griffin/core/config/SwaggerConfig.java index bc388e2..15dce47 100644 --- a/service/src/main/java/org/apache/griffin/core/config/SwaggerConfig.java +++ b/service/src/main/java/org/apache/griffin/core/config/SwaggerConfig.java @@ -16,6 +16,7 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package org.apache.griffin.core.config; import org.springframework.context.annotation.Configuration; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/config/jobConfig/AutowiringSpringBeanJobFactory.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/config/jobConfig/AutowiringSpringBeanJobFactory.java b/service/src/main/java/org/apache/griffin/core/config/jobConfig/AutowiringSpringBeanJobFactory.java index 3cbfc7d..be2c02d 100644 --- a/service/src/main/java/org/apache/griffin/core/config/jobConfig/AutowiringSpringBeanJobFactory.java +++ b/service/src/main/java/org/apache/griffin/core/config/jobConfig/AutowiringSpringBeanJobFactory.java @@ -28,27 +28,27 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.scheduling.quartz.SpringBeanJobFactory; public final class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory - implements ApplicationContextAware { - private static final Logger LOGGER = LoggerFactory.getLogger(AutowiringSpringBeanJobFactory.class); + implements ApplicationContextAware { + private static final Logger LOGGER = LoggerFactory.getLogger(AutowiringSpringBeanJobFactory.class); - private transient AutowireCapableBeanFactory beanFactory; + private transient AutowireCapableBeanFactory beanFactory; - @Override - public void setApplicationContext(final ApplicationContext context) { - beanFactory = context.getAutowireCapableBeanFactory(); - } + @Override + public void setApplicationContext(final ApplicationContext context) { + beanFactory = context.getAutowireCapableBeanFactory(); + } - @Override - protected Object createJobInstance(final TriggerFiredBundle bundle) { + @Override + protected Object createJobInstance(final TriggerFiredBundle bundle) { - try { - final Object job = super.createJobInstance(bundle); - beanFactory.autowireBean(job); - return job; + try { + final Object job = super.createJobInstance(bundle); + beanFactory.autowireBean(job); + return job; - } catch (Exception e) { - LOGGER.error("fail to create job instance. "+e); + } catch (Exception e) { + LOGGER.error("fail to create job instance. {}", e.getMessage()); + } + return null; } - return null; - } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java b/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java index 2089ca2..4e41194 100644 --- a/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java +++ b/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java @@ -25,9 +25,6 @@ import org.springframework.context.annotation.Configuration; import java.util.Properties; -/** - * Created by xiangrchen on 7/26/17. - */ @Configuration public class SparkJobConfig { @Bean(name = "sparkJobProps") http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/error/exception/GriffinException.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/error/exception/GriffinException.java b/service/src/main/java/org/apache/griffin/core/error/exception/GriffinException.java index 8d9d8a7..67fa8ae 100644 --- a/service/src/main/java/org/apache/griffin/core/error/exception/GriffinException.java +++ b/service/src/main/java/org/apache/griffin/core/error/exception/GriffinException.java @@ -22,20 +22,20 @@ package org.apache.griffin.core.error.exception; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.ResponseStatus; -/** - * Created by xiangrchen on 7/27/17. - */ -public abstract class GriffinException extends RuntimeException{ - @ResponseStatus(value = HttpStatus.GATEWAY_TIMEOUT,reason="Fail to Connect Kafka") - public static class KafkaConnectionException extends GriffinException{ - } - @ResponseStatus(value = HttpStatus.GATEWAY_TIMEOUT,reason="Fail to Connect Hive") - public static class HiveConnectionException extends GriffinException{ - } - @ResponseStatus(value = HttpStatus.NOT_FOUND,reason="Fail to Get HealthInfo") - public static class GetHealthInfoFailureException extends GriffinException { - } - @ResponseStatus(value = HttpStatus.NOT_FOUND,reason="Fail to Get Jobs") - public static class GetJobsFailureException extends GriffinException { - } +public abstract class GriffinException extends RuntimeException { + @ResponseStatus(value = HttpStatus.GATEWAY_TIMEOUT, reason = "Fail to Connect Kafka") + public static class KafkaConnectionException extends GriffinException { + } + + @ResponseStatus(value = HttpStatus.GATEWAY_TIMEOUT, reason = "Fail to Connect Hive") + public static class HiveConnectionException extends GriffinException { + } + + @ResponseStatus(value = HttpStatus.NOT_FOUND, reason = "Fail to Get HealthInfo") + public static class GetHealthInfoFailureException extends GriffinException { + } + + @ResponseStatus(value = HttpStatus.NOT_FOUND, reason = "Fail to Get Jobs") + public static class GetJobsFailureException extends GriffinException { + } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/error/exception/RuntimeExceptionHandler.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/error/exception/RuntimeExceptionHandler.java b/service/src/main/java/org/apache/griffin/core/error/exception/RuntimeExceptionHandler.java index ba1d8ed..d35fc44 100644 --- a/service/src/main/java/org/apache/griffin/core/error/exception/RuntimeExceptionHandler.java +++ b/service/src/main/java/org/apache/griffin/core/error/exception/RuntimeExceptionHandler.java @@ -16,6 +16,7 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package org.apache.griffin.core.error.exception; import org.apache.griffin.core.util.GriffinOperationMessage; @@ -32,33 +33,30 @@ import org.springframework.web.context.request.ServletWebRequest; import javax.servlet.http.HttpServletRequest; import java.util.Map; -/** - * Created by xiangrchen on 7/24/17. - */ @ControllerAdvice @ResponseBody public class RuntimeExceptionHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(RuntimeExceptionHandler.class); - - @ExceptionHandler(RuntimeException.class) - public ResponseEntity<Map<String, Object>> handleUnexpectedRuntimeException(RuntimeException e, HttpServletRequest request){ - LOGGER.error("Unexpected RuntimeException. "+e); - return setExceptionResponse(request, HttpStatus.INTERNAL_SERVER_ERROR, GriffinOperationMessage.UNEXPECTED_RUNTIME_EXCEPTION); - } - - @ExceptionHandler(value = GriffinException.class) - public void handleCustomException(GriffinException e) throws GriffinException { - throw e; - } - - private ResponseEntity<Map<String, Object>> setExceptionResponse(HttpServletRequest request, HttpStatus status, - GriffinOperationMessage message) { - request.setAttribute("javax.servlet.error.status_code", status.value()); - request.setAttribute("javax.servlet.error.message", message.getDescription()); - request.setAttribute("javax.servlet.error.error", status.toString()); - request.setAttribute("javax.servlet.error.request_uri", request.getRequestURI()); - Map<String, Object> map=(new DefaultErrorAttributes()) - .getErrorAttributes(new ServletWebRequest(request), false); - return new ResponseEntity(map, status); - } + private static final Logger LOGGER = LoggerFactory.getLogger(RuntimeExceptionHandler.class); + + @ExceptionHandler(RuntimeException.class) + public ResponseEntity<Map<String, Object>> handleUnexpectedRuntimeException(RuntimeException e, HttpServletRequest request) { + LOGGER.error("Unexpected RuntimeException. " + e); + return setExceptionResponse(request, HttpStatus.INTERNAL_SERVER_ERROR, GriffinOperationMessage.UNEXPECTED_RUNTIME_EXCEPTION); + } + + @ExceptionHandler(value = GriffinException.class) + public void handleCustomException(GriffinException e) throws GriffinException { + throw e; + } + + private ResponseEntity<Map<String, Object>> setExceptionResponse(HttpServletRequest request, HttpStatus status, + GriffinOperationMessage message) { + request.setAttribute("javax.servlet.error.status_code", status.value()); + request.setAttribute("javax.servlet.error.message", message.getDescription()); + request.setAttribute("javax.servlet.error.error", status.toString()); + request.setAttribute("javax.servlet.error.request_uri", request.getRequestURI()); + Map<String, Object> map = (new DefaultErrorAttributes()) + .getErrorAttributes(new ServletWebRequest(request), false); + return new ResponseEntity(map, status); + } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/job/JobController.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java index ec515b2..3254530 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobController.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java @@ -40,32 +40,32 @@ public class JobController { @Autowired private JobService jobService; - @RequestMapping(value = "/",method = RequestMethod.GET) + @RequestMapping(value = "/", method = RequestMethod.GET) public List<Map<String, Serializable>> getJobs() { return jobService.getAliveJobs(); } @RequestMapping(value = "", method = RequestMethod.POST) public GriffinOperationMessage addJob(@RequestParam("group") String groupName, - @RequestParam("jobName") String jobName, - @RequestParam("measureId") Long measureId, - @RequestBody JobRequestBody jobRequestBody) { - return jobService.addJob(groupName,jobName, measureId, jobRequestBody); + @RequestParam("jobName") String jobName, + @RequestParam("measureId") Long measureId, + @RequestBody JobRequestBody jobRequestBody) { + return jobService.addJob(groupName, jobName, measureId, jobRequestBody); } @RequestMapping(value = "", method = RequestMethod.DELETE) public GriffinOperationMessage deleteJob(@RequestParam("group") String group, @RequestParam("jobName") String jobName) { - return jobService.deleteJob(group,jobName); + return jobService.deleteJob(group, jobName); } - @RequestMapping(value = "/instances",method = RequestMethod.GET) + @RequestMapping(value = "/instances", method = RequestMethod.GET) public List<JobInstance> findInstancesOfJob(@RequestParam("group") String group, @RequestParam("jobName") String jobName, @RequestParam("page") int page, @RequestParam("size") int size) { - return jobService.findInstancesOfJob(group,jobName,page,size); + return jobService.findInstancesOfJob(group, jobName, page, size); } - @RequestMapping(value = "/health",method = RequestMethod.GET) - public JobHealth getHealthInfo() { + @RequestMapping(value = "/health", method = RequestMethod.GET) + public JobHealth getHealthInfo() { return jobService.getHealthInfo(); } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/job/JobService.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobService.java b/service/src/main/java/org/apache/griffin/core/job/JobService.java index 4482da5..23f8a82 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobService.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobService.java @@ -36,7 +36,7 @@ public interface JobService { GriffinOperationMessage pauseJob(String group, String name); - GriffinOperationMessage deleteJob(String groupName,String jobName); + GriffinOperationMessage deleteJob(String groupName, String jobName); List<JobInstance> findInstancesOfJob(String group, String name, int page, int size); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java index 41bd399..5be3f8d 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java @@ -48,9 +48,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.*; -import static org.apache.griffin.core.util.GriffinOperationMessage.CREATE_JOB_FAIL; -import static org.apache.griffin.core.util.GriffinOperationMessage.PAUSE_JOB_SUCCESS; -import static org.apache.griffin.core.util.GriffinOperationMessage.SET_JOB_DELETED_STATUS_SUCCESS; +import static org.apache.griffin.core.util.GriffinOperationMessage.*; import static org.quartz.JobBuilder.newJob; import static org.quartz.JobKey.jobKey; import static org.quartz.TriggerBuilder.newTrigger; @@ -61,13 +59,13 @@ public class JobServiceImpl implements JobService { private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceImpl.class); @Autowired - SchedulerFactoryBean factory; + private SchedulerFactoryBean factory; @Autowired - JobInstanceRepo jobInstanceRepo; + private JobInstanceRepo jobInstanceRepo; @Autowired - Properties sparkJobProps; + private Properties sparkJobProps; - public JobServiceImpl(){ + public JobServiceImpl() { } @Override @@ -77,14 +75,14 @@ public class JobServiceImpl implements JobService { try { for (String groupName : scheduler.getJobGroupNames()) { for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))) { - Map jobInfoMap = genJobInfoMap(scheduler, jobKey); - if(jobInfoMap.size()!=0 && isJobDeleted(scheduler, jobKey) == false){ + Map jobInfoMap = getJobInfoMap(scheduler, jobKey); + if (jobInfoMap.size() != 0 && !isJobDeleted(scheduler, jobKey)) { list.add(jobInfoMap); } } } } catch (SchedulerException e) { - LOGGER.error("failed to get running jobs."+e); + LOGGER.error("failed to get running jobs.{}", e.getMessage()); throw new GetJobsFailureException(); } return list; @@ -92,44 +90,41 @@ public class JobServiceImpl implements JobService { private boolean isJobDeleted(Scheduler scheduler, JobKey jobKey) throws SchedulerException { JobDataMap jobDataMap = scheduler.getJobDetail(jobKey).getJobDataMap(); - boolean status=jobDataMap.getBooleanFromString("deleted"); - return status; + return jobDataMap.getBooleanFromString("deleted"); } - public Map genJobInfoMap(Scheduler scheduler,JobKey jobKey) throws SchedulerException { + private Map getJobInfoMap(Scheduler scheduler, JobKey jobKey) throws SchedulerException { List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey); Map<String, Serializable> jobInfoMap = new HashMap<>(); - if (triggers==null || triggers.size() == 0){ + if (triggers == null || triggers.size() == 0) { return jobInfoMap; } JobDetail jd = scheduler.getJobDetail(jobKey); Date nextFireTime = triggers.get(0).getNextFireTime(); - Date previousFireTime=triggers.get(0).getPreviousFireTime(); - Trigger.TriggerState triggerState=scheduler.getTriggerState(triggers.get(0).getKey()); + Date previousFireTime = triggers.get(0).getPreviousFireTime(); + Trigger.TriggerState triggerState = scheduler.getTriggerState(triggers.get(0).getKey()); jobInfoMap.put("jobName", jobKey.getName()); jobInfoMap.put("groupName", jobKey.getGroup()); - if (nextFireTime!=null){ + if (nextFireTime != null) { jobInfoMap.put("nextFireTime", nextFireTime.getTime()); - } - else { + } else { jobInfoMap.put("nextFireTime", -1); } - if (previousFireTime!=null) { + if (previousFireTime != null) { jobInfoMap.put("previousFireTime", previousFireTime.getTime()); - } - else { + } else { jobInfoMap.put("previousFireTime", -1); } - jobInfoMap.put("triggerState",triggerState); + jobInfoMap.put("triggerState", triggerState); jobInfoMap.put("measureId", jd.getJobDataMap().getString("measureId")); - jobInfoMap.put("sourcePattern",jd.getJobDataMap().getString("sourcePattern")); - jobInfoMap.put("targetPattern",jd.getJobDataMap().getString("targetPattern")); - if(StringUtils.isNotEmpty(jd.getJobDataMap().getString("blockStartTimestamp"))) { + jobInfoMap.put("sourcePattern", jd.getJobDataMap().getString("sourcePattern")); + jobInfoMap.put("targetPattern", jd.getJobDataMap().getString("targetPattern")); + if (StringUtils.isNotEmpty(jd.getJobDataMap().getString("blockStartTimestamp"))) { jobInfoMap.put("blockStartTimestamp", jd.getJobDataMap().getString("blockStartTimestamp")); } - jobInfoMap.put("jobStartTime",jd.getJobDataMap().getString("jobStartTime")); - jobInfoMap.put("interval",jd.getJobDataMap().getString("interval")); + jobInfoMap.put("jobStartTime", jd.getJobDataMap().getString("jobStartTime")); + jobInfoMap.put("interval", jd.getJobDataMap().getString("interval")); return jobInfoMap; } @@ -137,21 +132,20 @@ public class JobServiceImpl implements JobService { public GriffinOperationMessage addJob(String groupName, String jobName, Long measureId, JobRequestBody jobRequestBody) { int interval; Date jobStartTime; - try{ + try { interval = Integer.parseInt(jobRequestBody.getInterval()); - jobStartTime=new Date(Long.parseLong(jobRequestBody.getJobStartTime())); - setJobStartTime(jobStartTime,interval); - }catch (Exception e){ - LOGGER.info("jobStartTime or interval format error! "+e); + jobStartTime = new Date(Long.parseLong(jobRequestBody.getJobStartTime())); + setJobStartTime(jobStartTime, interval); + } catch (Exception e) { + LOGGER.info("jobStartTime or interval format error! {}", e.getMessage()); return CREATE_JOB_FAIL; } try { Scheduler scheduler = factory.getObject(); TriggerKey triggerKey = triggerKey(jobName, groupName); if (scheduler.checkExists(triggerKey)) { - LOGGER.error("the triggerKey(jobName,groupName) "+jobName+" has been used."); + LOGGER.error("the triggerKey(jobName,groupName) {} has been used.", jobName); return CREATE_JOB_FAIL; - //scheduler.unscheduleJob(triggerKey); } JobKey jobKey = jobKey(jobName, groupName); JobDetail jobDetail; @@ -179,25 +173,25 @@ public class JobServiceImpl implements JobService { scheduler.scheduleJob(trigger); return GriffinOperationMessage.CREATE_JOB_SUCCESS; } catch (SchedulerException e) { - LOGGER.error("SchedulerException when add job.", e); + LOGGER.error("SchedulerException when add job. {}", e.getMessage()); return CREATE_JOB_FAIL; } } - public void setJobStartTime(Date jobStartTime,int interval){ - long currentTimestamp=System.currentTimeMillis(); - long jobstartTimestamp=jobStartTime.getTime(); + private void setJobStartTime(Date jobStartTime, int interval) { + long currentTimestamp = System.currentTimeMillis(); + long jobStartTimestamp = jobStartTime.getTime(); //if jobStartTime is before currentTimestamp, reset it with a future time - if(jobStartTime.before(new Date(currentTimestamp))){ - long n=(currentTimestamp-jobstartTimestamp)/(long)(interval*1000); - jobstartTimestamp=jobstartTimestamp+(n+1)*(long)(interval*1000); - jobStartTime.setTime(jobstartTimestamp); + if (jobStartTime.before(new Date(currentTimestamp))) { + long n = (currentTimestamp - jobStartTimestamp) / (long) (interval * 1000); + jobStartTimestamp = jobStartTimestamp + (n + 1) * (long) (interval * 1000); + jobStartTime.setTime(jobStartTimestamp); } } - public void setJobData(JobDetail jobDetail, JobRequestBody jobRequestBody, Long measureId, String groupName, String jobName){ - jobDetail.getJobDataMap().put("groupName",groupName); - jobDetail.getJobDataMap().put("jobName",jobName); + private void setJobData(JobDetail jobDetail, JobRequestBody jobRequestBody, Long measureId, String groupName, String jobName) { + jobDetail.getJobDataMap().put("groupName", groupName); + jobDetail.getJobDataMap().put("jobName", jobName); jobDetail.getJobDataMap().put("measureId", measureId.toString()); jobDetail.getJobDataMap().put("sourcePattern", jobRequestBody.getSourcePattern()); jobDetail.getJobDataMap().put("targetPattern", jobRequestBody.getTargetPattern()); @@ -209,26 +203,26 @@ public class JobServiceImpl implements JobService { } @Override - public GriffinOperationMessage pauseJob(String group, String name){ + public GriffinOperationMessage pauseJob(String group, String name) { try { Scheduler scheduler = factory.getObject(); scheduler.pauseJob(new JobKey(name, group)); return GriffinOperationMessage.PAUSE_JOB_SUCCESS; } catch (SchedulerException e) { - LOGGER.error(GriffinOperationMessage.PAUSE_JOB_FAIL+""+e); + LOGGER.error("{} {}", GriffinOperationMessage.PAUSE_JOB_FAIL, e.getMessage()); return GriffinOperationMessage.PAUSE_JOB_FAIL; } } - private GriffinOperationMessage setJobDeleted(String group, String name){ + private GriffinOperationMessage setJobDeleted(String group, String name) { try { Scheduler scheduler = factory.getObject(); - JobDetail jobDetail=scheduler.getJobDetail(new JobKey(name, group)); + JobDetail jobDetail = scheduler.getJobDetail(new JobKey(name, group)); jobDetail.getJobDataMap().putAsString("deleted", true); scheduler.addJob(jobDetail, true); return GriffinOperationMessage.SET_JOB_DELETED_STATUS_SUCCESS; } catch (SchedulerException e) { - LOGGER.error(GriffinOperationMessage.PAUSE_JOB_FAIL+""+e); + LOGGER.error("{} {}", GriffinOperationMessage.PAUSE_JOB_FAIL, e.getMessage()); return GriffinOperationMessage.SET_JOB_DELETED_STATUS_FAIL; } } @@ -237,6 +231,7 @@ public class JobServiceImpl implements JobService { * logically delete * 1. pause these jobs * 2. set these jobs as deleted status + * * @param group * @param name * @return @@ -244,8 +239,8 @@ public class JobServiceImpl implements JobService { @Override public GriffinOperationMessage deleteJob(String group, String name) { //logically delete - if (pauseJob(group,name).equals(PAUSE_JOB_SUCCESS) && - setJobDeleted(group, name).equals(SET_JOB_DELETED_STATUS_SUCCESS)){ + if (pauseJob(group, name).equals(PAUSE_JOB_SUCCESS) && + setJobDeleted(group, name).equals(SET_JOB_DELETED_STATUS_SUCCESS)) { return GriffinOperationMessage.DELETE_JOB_SUCCESS; } return GriffinOperationMessage.DELETE_JOB_FAIL; @@ -255,87 +250,91 @@ public class JobServiceImpl implements JobService { * deleteJobsRelateToMeasure * 1. search jobs related to measure * 2. deleteJob + * * @param measure */ public void deleteJobsRelateToMeasure(Measure measure) { Scheduler scheduler = factory.getObject(); try { - for(JobKey jobKey: scheduler.getJobKeys(GroupMatcher.anyGroup())){//get all jobs + for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.anyGroup())) {//get all jobs JobDetail jobDetail = scheduler.getJobDetail(jobKey); JobDataMap jobDataMap = jobDetail.getJobDataMap(); - if(jobDataMap.getString("measureId").equals(measure.getId().toString())){ + if (jobDataMap.getString("measureId").equals(measure.getId().toString())) { //select jobs related to measureId, - deleteJob(jobKey.getGroup(),jobKey.getName()); - LOGGER.info(jobKey.getGroup()+" "+jobKey.getName()+" is paused and logically deleted."); + deleteJob(jobKey.getGroup(), jobKey.getName()); + LOGGER.info("{} {} is paused and logically deleted.", jobKey.getGroup(), jobKey.getName()); } } } catch (SchedulerException e) { - LOGGER.error("Fail to stop jobs related to measure id: " + measure.getId()+"name: "+measure.getName()); + LOGGER.error("Fail to stop jobs related to measure id: {} name: {}", measure.getId(), measure.getName()); + LOGGER.error("Fail to stop jobs related to measure id: {} name: {}", measure.getId(), measure.getName()); } } @Override public List<JobInstance> findInstancesOfJob(String group, String jobName, int page, int size) { //query and return instances - Pageable pageRequest=new PageRequest(page,size, Sort.Direction.DESC,"timestamp"); - return jobInstanceRepo.findByGroupNameAndJobName(group,jobName,pageRequest); + Pageable pageRequest = new PageRequest(page, size, Sort.Direction.DESC, "timestamp"); + return jobInstanceRepo.findByGroupNameAndJobName(group, jobName, pageRequest); } @Scheduled(fixedDelayString = "${jobInstance.fixedDelay.in.milliseconds}") - public void syncInstancesOfAllJobs(){ - List<Object> groupJobList=jobInstanceRepo.findGroupWithJobName(); - for (Object groupJobObj : groupJobList){ - try{ - Object[] groupJob=(Object[])groupJobObj; - if (groupJob!=null && groupJob.length==2){ - syncInstancesOfJob(groupJob[0].toString(),groupJob[1].toString()); + public void syncInstancesOfAllJobs() { + List<Object> groupJobList = jobInstanceRepo.findGroupWithJobName(); + for (Object groupJobObj : groupJobList) { + try { + Object[] groupJob = (Object[]) groupJobObj; + if (groupJob != null && groupJob.length == 2) { + syncInstancesOfJob(groupJob[0].toString(), groupJob[1].toString()); } - }catch (Exception e){ - LOGGER.error("schedule update instances of all jobs failed. "+e); + } catch (Exception e) { + LOGGER.error("schedule update instances of all jobs failed. {}", e.getMessage()); } } } /** * call livy to update jobInstance table in mysql. + * * @param group * @param jobName */ - public void syncInstancesOfJob(String group, String jobName) { + private void syncInstancesOfJob(String group, String jobName) { //update all instance info belongs to this group and job. - List<JobInstance> jobInstanceList=jobInstanceRepo.findByGroupNameAndJobName(group,jobName); - for (JobInstance jobInstance:jobInstanceList){ - if (!LivySessionStates.isActive(jobInstance.getState())){ + List<JobInstance> jobInstanceList = jobInstanceRepo.findByGroupNameAndJobName(group, jobName); + for (JobInstance jobInstance : jobInstanceList) { + if (!LivySessionStates.isActive(jobInstance.getState())) { continue; } - String uri=sparkJobProps.getProperty("livy.uri")+"/"+jobInstance.getSessionId(); - RestTemplate restTemplate=new RestTemplate(); - String resultStr=null; - try{ - resultStr=restTemplate.getForObject(uri,String.class); - }catch (Exception e){ - LOGGER.error("spark session "+jobInstance.getSessionId()+" has overdue, set state as unknown!\n"+e); + String uri = sparkJobProps.getProperty("livy.uri") + "/" + jobInstance.getSessionId(); + RestTemplate restTemplate = new RestTemplate(); + String resultStr; + try { + resultStr = restTemplate.getForObject(uri, String.class); + } catch (Exception e) { + LOGGER.error("spark session {} has overdue, set state as unknown!\n {}", jobInstance.getSessionId(), e.getMessage()); //if server cannot get session from Livy, set State as unknown. jobInstance.setState(LivySessionStates.State.unknown); jobInstanceRepo.save(jobInstance); continue; } - TypeReference<HashMap<String,Object>> type=new TypeReference<HashMap<String,Object>>(){}; - HashMap<String,Object> resultMap; + TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() { + }; + HashMap<String, Object> resultMap; try { - resultMap = GriffinUtil.toEntity(resultStr,type); + resultMap = GriffinUtil.toEntity(resultStr, type); } catch (IOException e) { - LOGGER.error("jobInstance jsonStr convert to map failed. "+e); + LOGGER.error("jobInstance jsonStr convert to map failed. {}", e.getMessage()); continue; } - try{ - if (resultMap!=null && resultMap.size()!=0){ + try { + if (resultMap != null && resultMap.size() != 0) { jobInstance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString())); jobInstance.setAppId(resultMap.get("appId").toString()); - jobInstance.setAppUri(sparkJobProps.getProperty("spark.uri")+"/cluster/app/"+resultMap.get("appId").toString()); + jobInstance.setAppUri(sparkJobProps.getProperty("spark.uri") + "/cluster/app/" + resultMap.get("appId").toString()); } - }catch (Exception e){ - LOGGER.warn(group+","+jobName+"job Instance has some null field (state or appId). "+e); + } catch (Exception e) { + LOGGER.warn("{},{} job Instance has some null field (state or appId). {}", group, jobName, e.getMessage()); continue; } jobInstanceRepo.save(jobInstance); @@ -344,35 +343,35 @@ public class JobServiceImpl implements JobService { /** * a job is regard as healthy job when its latest instance is in healthy state. + * * @return */ @Override - public JobHealth getHealthInfo() { - Scheduler scheduler=factory.getObject(); - int jobCount= 0; - int notHealthyCount=0; + public JobHealth getHealthInfo() { + Scheduler scheduler = factory.getObject(); + int jobCount = 0; + int notHealthyCount = 0; try { - for (String groupName : scheduler.getJobGroupNames()){ - for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))){ + for (String groupName : scheduler.getJobGroupNames()) { + for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))) { jobCount++; - String jobName=jobKey.getName(); - String jobGroup=jobKey.getGroup(); - Pageable pageRequest=new PageRequest(0,1, Sort.Direction.DESC,"timestamp"); + String jobName = jobKey.getName(); + String jobGroup = jobKey.getGroup(); + Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, "timestamp"); JobInstance latestJobInstance; - if (jobInstanceRepo.findByGroupNameAndJobName(jobGroup,jobName,pageRequest)!=null - &&jobInstanceRepo.findByGroupNameAndJobName(jobGroup,jobName,pageRequest).size()>0){ - latestJobInstance=jobInstanceRepo.findByGroupNameAndJobName(jobGroup,jobName,pageRequest).get(0); - if(!LivySessionStates.isHeathy(latestJobInstance.getState())){ + if (jobInstanceRepo.findByGroupNameAndJobName(jobGroup, jobName, pageRequest) != null + && jobInstanceRepo.findByGroupNameAndJobName(jobGroup, jobName, pageRequest).size() > 0) { + latestJobInstance = jobInstanceRepo.findByGroupNameAndJobName(jobGroup, jobName, pageRequest).get(0); + if (!LivySessionStates.isHeathy(latestJobInstance.getState())) { notHealthyCount++; } } } } } catch (SchedulerException e) { - LOGGER.error(""+e); + LOGGER.error(e.getMessage()); throw new GetHealthInfoFailureException(); } - JobHealth jobHealth=new JobHealth(jobCount-notHealthyCount,jobCount); - return jobHealth; + return new JobHealth(jobCount - notHealthyCount, jobCount); } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java index 47157e0..a6d7487 100644 --- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java +++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java @@ -21,14 +21,13 @@ package org.apache.griffin.core.job; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; import org.apache.commons.lang.StringUtils; import org.apache.griffin.core.job.entity.JobInstance; import org.apache.griffin.core.job.entity.LivySessionStates; import org.apache.griffin.core.job.entity.SparkJobDO; import org.apache.griffin.core.job.repo.JobInstanceRepo; import org.apache.griffin.core.measure.entity.DataConnector; +import org.apache.griffin.core.measure.entity.DataSource; import org.apache.griffin.core.measure.entity.Measure; import org.apache.griffin.core.measure.repo.MeasureRepo; import org.apache.griffin.core.util.GriffinUtil; @@ -65,11 +64,11 @@ public class SparkSubmitJob implements Job { * for example * sourcePatternItems or targetPatternItems is like "YYYYMMDD","HH",... */ - private String[] sourcePatternItems,targetPatternItems; + private String[] sourcePatternItems, targetPatternItems; private Measure measure; - private String sourcePattern,targetPattern; - private String blockStartTimestamp,lastBlockStartTimestamp; + private String sourcePattern, targetPattern; + private String blockStartTimestamp, lastBlockStartTimestamp; private String interval; private String uri; private RestTemplate restTemplate = new RestTemplate(); @@ -80,54 +79,82 @@ public class SparkSubmitJob implements Job { /** * execute method is used to submit sparkJobDO to Livy. + * * @param context */ @Override public void execute(JobExecutionContext context) { JobDetail jd = context.getJobDetail(); - String groupName=jd.getJobDataMap().getString("groupName"); - String jobName=jd.getJobDataMap().getString("jobName"); - init(jd); + String groupName = jd.getJobDataMap().getString("groupName"); + String jobName = jd.getJobDataMap().getString("jobName"); + initParam(jd); //prepare current system timestamp - long currentblockStartTimestamp = setCurrentblockStartTimestamp(System.currentTimeMillis()); - LOGGER.info("currentblockStartTimestamp: "+currentblockStartTimestamp); - if (StringUtils.isNotEmpty(sourcePattern)) { - sourcePatternItems = sourcePattern.split("-"); - setDataConnectorPartitions(measure.getSource(), sourcePatternItems, partitionItems, currentblockStartTimestamp); - } - if (StringUtils.isNotEmpty(targetPattern)) { - targetPatternItems = targetPattern.split("-"); - setDataConnectorPartitions(measure.getTarget(), targetPatternItems, partitionItems, currentblockStartTimestamp); + long currentBlockStartTimestamp = setCurrentBlockStartTimestamp(System.currentTimeMillis()); + LOGGER.info("currentBlockStartTimestamp: {}", currentBlockStartTimestamp); + try { + if (StringUtils.isNotEmpty(sourcePattern)) + setAllDataConnectorPartitions(measure.getDataSources(), sourcePattern.split("-"), partitionItems, "source", currentBlockStartTimestamp); + if (StringUtils.isNotEmpty(targetPattern)) + setAllDataConnectorPartitions(measure.getDataSources(), targetPattern.split("-"), partitionItems, "target", currentBlockStartTimestamp); + } catch (Exception e) { + LOGGER.error("Can not execute job.Set partitions error. {}", e.getMessage()); + return; } - jd.getJobDataMap().put("lastBlockStartTimestamp", currentblockStartTimestamp + ""); + jd.getJobDataMap().put("lastBlockStartTimestamp", currentBlockStartTimestamp + ""); setSparkJobDO(); - String result = restTemplate.postForObject(uri, sparkJobDO, String.class); + String result; + try { + result = restTemplate.postForObject(uri, sparkJobDO, String.class); + } catch (Exception e) { + LOGGER.error("Post spark task error. {}", e.getMessage()); + return; + } LOGGER.info(result); - saveJobInstance(groupName,jobName,result); + saveJobInstance(groupName, jobName, result); } - public void init(JobDetail jd){ - //jd.getJobDataMap().getString() + private void initParam(JobDetail jd) { /** * the field measureId is generated from `setJobData` in `JobServiceImpl` */ String measureId = jd.getJobDataMap().getString("measureId"); measure = measureRepo.findOne(Long.valueOf(measureId)); - if (measure==null) { - LOGGER.error("Measure with id " + measureId + " is not find!"); - //if return here, livy uri won't be set, and will keep null for all measures even they are not null + if (measure == null) { + LOGGER.error("Measure with id {} is not find!", measureId); + return; } - String partitionItemstr = sparkJobProps.getProperty("sparkJob.dateAndHour"); - partitionItems = partitionItemstr.split(","); + setMeasureInstanceName(measure, jd); + partitionItems = sparkJobProps.getProperty("sparkJob.dateAndHour").split(","); uri = sparkJobProps.getProperty("livy.uri"); sourcePattern = jd.getJobDataMap().getString("sourcePattern"); targetPattern = jd.getJobDataMap().getString("targetPattern"); blockStartTimestamp = jd.getJobDataMap().getString("blockStartTimestamp"); lastBlockStartTimestamp = jd.getJobDataMap().getString("lastBlockStartTimestamp"); - LOGGER.info("lastBlockStartTimestamp:"+lastBlockStartTimestamp); + LOGGER.info("lastBlockStartTimestamp:{}", lastBlockStartTimestamp); interval = jd.getJobDataMap().getString("interval"); } + private void setMeasureInstanceName(Measure measure, JobDetail jd) { + // in order to keep metric name unique, we set measure name as jobName at present + measure.setName(jd.getJobDataMap().getString("jobName")); + } + + private void setAllDataConnectorPartitions(List<DataSource> sources, String[] patternItemSet, String[] partitionItems, String sourceName, long timestamp) { + if (sources == null) + return; + for (DataSource dataSource : sources) { + setDataSourcePartitions(dataSource, patternItemSet, partitionItems, sourceName, timestamp); + } + } + + private void setDataSourcePartitions(DataSource dataSource, String[] patternItemSet, String[] partitionItems, String sourceName, long timestamp) { + String name = dataSource.getName(); + for (DataConnector dataConnector : dataSource.getConnectors()) { + if (sourceName.equals(name)) + setDataConnectorPartitions(dataConnector, patternItemSet, partitionItems, timestamp); + } + } + private void setDataConnectorPartitions(DataConnector dc, String[] patternItemSet, String[] partitionItems, long timestamp) { Map<String, String> partitionItemMap = genPartitionMap(patternItemSet, partitionItems, timestamp); /** @@ -141,67 +168,64 @@ public class SparkSubmitJob implements Job { try { dc.setConfig(configMap); } catch (JsonProcessingException e) { - LOGGER.error(""+e); + LOGGER.error(e.getMessage()); } } - public Map<String, String> genPartitionMap(String[] patternItemSet, String[] partitionItems, long timestamp) { + + private Map<String, String> genPartitionMap(String[] patternItemSet, String[] partitionItems, long timestamp) { /** * patternItemSet:{YYYYMMdd,HH} * partitionItems:{dt,hour} * partitionItemMap:{dt=20170804,hour=09} */ - int comparableSizeMin=Math.min(patternItemSet.length,partitionItems.length); + int comparableSizeMin = Math.min(patternItemSet.length, partitionItems.length); Map<String, String> partitionItemMap = new HashMap<>(); for (int i = 0; i < comparableSizeMin; i++) { /** * in order to get a standard date like 20170427 01 (YYYYMMdd-HH) */ - String pattrn = patternItemSet[i].replace("mm", "MM"); - pattrn = pattrn.replace("DD", "dd"); - pattrn = pattrn.replace("hh", "HH"); - SimpleDateFormat sdf = new SimpleDateFormat(pattrn); + String pattern = patternItemSet[i].replace("mm", "MM"); + pattern = pattern.replace("DD", "dd"); + pattern = pattern.replace("hh", "HH"); + SimpleDateFormat sdf = new SimpleDateFormat(pattern); partitionItemMap.put(partitionItems[i], sdf.format(new Date(timestamp))); } return partitionItemMap; } - public long setCurrentblockStartTimestamp(long currentSystemTimestamp) { - long currentblockStartTimestamp=0; + private long setCurrentBlockStartTimestamp(long currentSystemTimestamp) { + long currentBlockStartTimestamp = 0; if (StringUtils.isNotEmpty(lastBlockStartTimestamp)) { try { - currentblockStartTimestamp = Long.parseLong(lastBlockStartTimestamp) + Integer.parseInt(interval) * 1000; - }catch (Exception e){ - LOGGER.info("lastBlockStartTimestamp or interval format problem! "+e); + currentBlockStartTimestamp = Long.parseLong(lastBlockStartTimestamp) + Integer.parseInt(interval) * 1000; + } catch (Exception e) { + LOGGER.info("lastBlockStartTimestamp or interval format problem! {}", e.getMessage()); } } else { if (StringUtils.isNotEmpty(blockStartTimestamp)) { - try{ - currentblockStartTimestamp = Long.parseLong(blockStartTimestamp); - }catch (Exception e){ - LOGGER.info("blockStartTimestamp format problem! "+e); + try { + currentBlockStartTimestamp = Long.parseLong(blockStartTimestamp); + } catch (Exception e) { + LOGGER.info("blockStartTimestamp format problem! {}", e.getMessage()); } } else { - currentblockStartTimestamp = currentSystemTimestamp; + currentBlockStartTimestamp = currentSystemTimestamp; } } - return currentblockStartTimestamp; + return currentBlockStartTimestamp; } - public void setSparkJobDO() { + private void setSparkJobDO() { sparkJobDO.setFile(sparkJobProps.getProperty("sparkJob.file")); sparkJobDO.setClassName(sparkJobProps.getProperty("sparkJob.className")); - List<String> args = new ArrayList<String>(); + List<String> args = new ArrayList<>(); args.add(sparkJobProps.getProperty("sparkJob.args_1")); - ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter(); - String measureJson = ""; - try { - measureJson = ow.writeValueAsString(measure); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } + // measure + String measureJson; + measureJson = GriffinUtil.toJson(measure); args.add(measureJson); //partition args.add(sparkJobProps.getProperty("sparkJob.args_3")); sparkJobDO.setArgs(args); @@ -213,8 +237,8 @@ public class SparkSubmitJob implements Job { sparkJobDO.setDriverMemory(sparkJobProps.getProperty("sparkJob.driverMemory")); sparkJobDO.setExecutorMemory(sparkJobProps.getProperty("sparkJob.executorMemory")); - Map<String,String> conf=new HashMap<String,String>(); - conf.put("spark.jars.packages",sparkJobProps.getProperty("sparkJob.spark.jars.packages")); + Map<String, String> conf = new HashMap<>(); + conf.put("spark.jars.packages", sparkJobProps.getProperty("sparkJob.spark.jars.packages")); sparkJobDO.setConf(conf); List<String> jars = new ArrayList<>(); @@ -227,25 +251,26 @@ public class SparkSubmitJob implements Job { sparkJobDO.setFiles(files); } - public void saveJobInstance(String groupName,String jobName,String result){ + private void saveJobInstance(String groupName, String jobName, String result) { //save JobInstance info into DataBase - Map<String,Object> resultMap=new HashMap<String,Object>(); - TypeReference<HashMap<String,Object>> type=new TypeReference<HashMap<String,Object>>(){}; + Map<String, Object> resultMap = new HashMap<>(); + TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() { + }; try { - resultMap= GriffinUtil.toEntity(result,type); + resultMap = GriffinUtil.toEntity(result, type); } catch (IOException e) { - LOGGER.error("jobInstance jsonStr convert to map failed. "+e); + LOGGER.error("jobInstance jsonStr convert to map failed. {}", e.getMessage()); } - JobInstance jobInstance=new JobInstance(); - if(resultMap!=null) { + JobInstance jobInstance = new JobInstance(); + if (resultMap != null) { jobInstance.setGroupName(groupName); jobInstance.setJobName(jobName); try { jobInstance.setSessionId(Integer.parseInt(resultMap.get("id").toString())); jobInstance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString())); jobInstance.setAppId(resultMap.get("appId").toString()); - }catch (Exception e){ - LOGGER.warn("jobInstance has null field. "+e); + } catch (Exception e) { + LOGGER.warn("jobInstance has null field. {}", e.getMessage()); } jobInstance.setTimestamp(System.currentTimeMillis()); jobInstanceRepo.save(jobInstance); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java index ab90dd3..4521999 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java @@ -24,9 +24,6 @@ import org.apache.griffin.core.measure.entity.AuditableEntity; import javax.persistence.*; -/** - * Created by xiangrchen on 5/31/17. - */ @Entity public class JobInstance extends AuditableEntity { @@ -36,12 +33,12 @@ public class JobInstance extends AuditableEntity { private String jobName; private int sessionId; @Enumerated(EnumType.STRING) - State state; - String appId; + private State state; + private String appId; @Lob - @Column(length=1024) //2^10=1024 + @Column(length = 1024) //2^10=1024 private String appUri; - long timestamp; + private long timestamp; public String getGroupName() { return groupName; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java index 796949e..dd28bf1 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java @@ -18,9 +18,6 @@ under the License. */ package org.apache.griffin.core.job.entity; -/** - * Created by xiangrchen on 4/27/17. - */ public class JobRequestBody { private String sourcePattern; private String targetPattern; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java index 989bc44..5839fb5 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java @@ -21,9 +21,6 @@ package org.apache.griffin.core.job.entity; import com.cloudera.livy.sessions.SessionState; -/** - * Created by xiangrchen on 7/31/17. - */ public class LivySessionStates { /** @@ -44,8 +41,8 @@ public class LivySessionStates { unknown } - public static SessionState toSessionState(State state){ - switch (state){ + public static SessionState toSessionState(State state) { + switch (state) { case not_started: return new SessionState.NotStarted(); case starting: @@ -71,21 +68,21 @@ public class LivySessionStates { } } - public static boolean isActive(State state){ - if (State.unknown.equals(state)){ + public static boolean isActive(State state) { + if (State.unknown.equals(state)) { // set unknown isactive() as false. return false; } - SessionState sessionState=toSessionState(state); - if (sessionState==null){ + SessionState sessionState = toSessionState(state); + if (sessionState == null) { return false; - }else { + } else { return sessionState.isActive(); } } - public static boolean isHeathy(State state){ - if (State.error.equals(state) || State.dead.equals(state) || State.shutting_down.equals(state)){ + public static boolean isHeathy(State state) { + if (State.error.equals(state) || State.dead.equals(state) || State.shutting_down.equals(state)) { return false; } return true; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java b/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java index c447260..437cde7 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java @@ -24,39 +24,35 @@ import java.util.List; import java.util.Map; /** - * Created by xiangrchen on 4/26/17. - */ - -/** * SparkJobDO - { - "file": "hdfs:///griffin/griffin-measure.jar", - "className": "org.apache.griffin.measure.batch.Application", - "args": [ - "/benchmark/test/env.json", - "{\"name\":\"data_rdm\",\"type\":\"accuracy\",\"source\":{\"type\":\"hive\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"data_rdm\"} },\"target\":{\"type\":\"hive\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"data_rdm\"} },\"evaluateRule\":{\"sampleRatio\":1,\"rules\":\"$source.uage > 100 AND $source.uid = $target.uid AND $source.uage + 12 = $target.uage + 10 + 2 AND $source.udes + 11 = $target.udes + 1 + 1\"} }", - "hdfs,raw" - ], - "name": "griffin-livy", - "queue": "default", - "numExecutors": 2, - "executorCores": 4, - "driverMemory": "2g", - "executorMemory": "2g", - "conf": { - "spark.jars.packages": "com.databricks:spark-avro_2.10:2.0.1" - }, - "jars": [ - "/livy/datanucleus-api-jdo-3.2.6.jar", - "/livy/datanucleus-core-3.2.10.jar", - "/livy/datanucleus-rdbms-3.2.9.jar" - ], - "files": [ - "/livy/hive-site.xml" - ] - }' + * { + * "file": "hdfs:///griffin/griffin-measure.jar", + * "className": "org.apache.griffin.measure.batch.Application", + * "args": [ + * "/benchmark/test/env.json", + * "{\"name\":\"data_rdm\",\"type\":\"accuracy\",\"source\":{\"type\":\"hive\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"data_rdm\"} },\"target\":{\"type\":\"hive\",\"version\":\"1.2\",\"config\":{\"database\":\"default\",\"table.name\":\"data_rdm\"} },\"evaluateRule\":{\"sampleRatio\":1,\"rules\":\"$source.uage > 100 AND $source.uid = $target.uid AND $source.uage + 12 = $target.uage + 10 + 2 AND $source.udes + 11 = $target.udes + 1 + 1\"} }", + * "hdfs,raw" + * ], + * "name": "griffin-livy", + * "queue": "default", + * "numExecutors": 2, + * "executorCores": 4, + * "driverMemory": "2g", + * "executorMemory": "2g", + * "conf": { + * "spark.jars.packages": "com.databricks:spark-avro_2.10:2.0.1" + * }, + * "jars": [ + * "/livy/datanucleus-api-jdo-3.2.6.jar", + * "/livy/datanucleus-core-3.2.10.jar", + * "/livy/datanucleus-rdbms-3.2.9.jar" + * ], + * "files": [ + * "/livy/hive-site.xml" + * ] + * }' */ -public class SparkJobDO implements Serializable{ +public class SparkJobDO implements Serializable { private String file; @@ -76,7 +72,7 @@ public class SparkJobDO implements Serializable{ private String executorMemory; - private Map<String,String> conf; + private Map<String, String> conf; private List<String> jars; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java index 0bce562..d07b2b7 100644 --- a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java +++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java @@ -25,17 +25,15 @@ import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; import org.springframework.stereotype.Repository; -import org.springframework.transaction.annotation.Transactional; import java.util.List; @Repository -public interface JobInstanceRepo extends CrudRepository<JobInstance,Long>{ +public interface JobInstanceRepo extends CrudRepository<JobInstance, Long> { /** - * - * @param group is group name - * @param name is job name + * @param group is group name + * @param name is job name * @param pageable * @return all job instances scheduled at different time using the same prototype job, * the prototype job is determined by SCHED_NAME, group name and job name in table QRTZ_JOB_DETAILS. @@ -49,18 +47,16 @@ public interface JobInstanceRepo extends CrudRepository<JobInstance,Long>{ "where s.groupName= ?1 and s.jobName=?2 ") List<JobInstance> findByGroupNameAndJobName(String group, String name); - @Query("select DISTINCT s.groupName, s.jobName from JobInstance s") + @Query("select DISTINCT s.groupName, s.jobName, s.id from JobInstance s") List<Object> findGroupWithJobName(); - @Transactional @Modifying @Query("delete from JobInstance s " + "where s.groupName= ?1 and s.jobName=?2 ") - void deleteByGroupAndjobName(String groupName, String jobName); + void deleteByGroupAndJobName(String groupName, String jobName); - @Transactional @Modifying - @Query("update JobInstance s "+ + @Query("update JobInstance s " + "set s.state= ?2, s.appId= ?3, s.appUri= ?4 where s.id= ?1") void update(Long Id, LivySessionStates.State state, String appId, String appUri); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/login/LoginController.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/login/LoginController.java b/service/src/main/java/org/apache/griffin/core/login/LoginController.java index 7a5f5af..2e75a81 100644 --- a/service/src/main/java/org/apache/griffin/core/login/LoginController.java +++ b/service/src/main/java/org/apache/griffin/core/login/LoginController.java @@ -23,11 +23,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; -import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; -import java.util.HashMap; import java.util.Map; @RestController http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/login/LoginService.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/login/LoginService.java b/service/src/main/java/org/apache/griffin/core/login/LoginService.java index 83b9c48..bdb5a64 100644 --- a/service/src/main/java/org/apache/griffin/core/login/LoginService.java +++ b/service/src/main/java/org/apache/griffin/core/login/LoginService.java @@ -23,13 +23,13 @@ import org.springframework.http.ResponseEntity; import java.util.Map; -public interface LoginService { +public interface LoginService { - public ResponseEntity<Map<String, Object>> login(Map<String, String> map); + ResponseEntity<Map<String, Object>> login(Map<String, String> map); - public ResponseEntity<Map<String, Object>> loginDefault(Map<String, String> map); + ResponseEntity<Map<String, Object>> loginDefault(Map<String, String> map); - public ResponseEntity<Map<String, Object>> loginLDAP(Map<String, String> map); + ResponseEntity<Map<String, Object>> loginLDAP(Map<String, String> map); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/login/LoginServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/login/LoginServiceImpl.java b/service/src/main/java/org/apache/griffin/core/login/LoginServiceImpl.java index 7598feb..5f8a069 100644 --- a/service/src/main/java/org/apache/griffin/core/login/LoginServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/login/LoginServiceImpl.java @@ -35,7 +35,9 @@ import javax.naming.directory.SearchControls; import javax.naming.directory.SearchResult; import javax.naming.ldap.InitialLdapContext; import javax.naming.ldap.LdapContext; -import java.util.*; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; @Service public class LoginServiceImpl implements LoginService { @@ -45,29 +47,31 @@ public class LoginServiceImpl implements LoginService { private Environment env; @Override - public ResponseEntity<Map<String, Object>> login(Map<String, String> map){ + public ResponseEntity<Map<String, Object>> login(Map<String, String> map) { String strategy = env.getProperty("login.strategy"); - switch (strategy){ - case "ldap": return loginLDAP(map); - case "default":return loginDefault(map); + switch (strategy) { + case "ldap": + return loginLDAP(map); + case "default": + return loginDefault(map); default: { LOGGER.error("Missing login strategy configuration"); - return new ResponseEntity<Map<String, Object>>(new HashMap<String,Object>(), HttpStatus.NOT_FOUND); + return new ResponseEntity<Map<String, Object>>(new HashMap<String, Object>(), HttpStatus.NOT_FOUND); } } } @Override - public ResponseEntity<Map<String, Object>> loginDefault(Map<String, String> map){ + public ResponseEntity<Map<String, Object>> loginDefault(Map<String, String> map) { String username = map.get("username"); String password = map.get("password"); - if(username == null || password == null){ + if (username == null || password == null) { LOGGER.error("Missing default login input"); return null; } String fullName = null; - if(username.equals("user")){ - if(password.equals("test")){ + if (username.equals("user")) { + if (password.equals("test")) { fullName = "Default"; } } @@ -78,7 +82,7 @@ public class LoginServiceImpl implements LoginService { public ResponseEntity<Map<String, Object>> loginLDAP(Map<String, String> map) { String ntAccount = map.get("username"); String password = map.get("password"); - if(ntAccount == null || password == null){ + if (ntAccount == null || password == null) { LOGGER.error("Missing ldap login input"); return null; } @@ -86,10 +90,10 @@ public class LoginServiceImpl implements LoginService { return getResponse(ntAccount, fullName); } - private String searchLDAP(String ntAccount, String password){ + private String searchLDAP(String ntAccount, String password) { String domainComponent = env.getProperty("ldap.dc"); Hashtable<String, String> ht = getLDAPEnvironmrnt(ntAccount, password); - if(domainComponent == null || ht == null){ + if (domainComponent == null || ht == null) { return null; } LdapContext ctx; @@ -106,7 +110,7 @@ public class LoginServiceImpl implements LoginService { Attributes attrs = searchResult.getAttributes(); if (attrs != null && attrs.get("cn") != null) { String cnName = (String) attrs.get("cn").get(); - if(cnName.indexOf("(") > 0){ + if (cnName.indexOf("(") > 0) { fullName = cnName.substring(0, cnName.indexOf("(")); } } @@ -118,12 +122,12 @@ public class LoginServiceImpl implements LoginService { return null; } - private Hashtable<String, String> getLDAPEnvironmrnt(String ntAccount, String password){ + private Hashtable<String, String> getLDAPEnvironmrnt(String ntAccount, String password) { String ldapUrl = env.getProperty("ldap.url"); String domain = env.getProperty("ldap.domain"); String connectTimeout = env.getProperty("ldap.connect-timeout"); String readTimeout = env.getProperty("ldap.read-timeout"); - if(ldapUrl == null || domain == null ||connectTimeout == null || readTimeout == null){ + if (ldapUrl == null || domain == null || connectTimeout == null || readTimeout == null) { LOGGER.error("Missing ldap properties"); return null; } @@ -139,14 +143,14 @@ public class LoginServiceImpl implements LoginService { return ht; } - private ResponseEntity<Map<String,Object>> getResponse(String ntAccount, String fullName){ - Map<String,Object> message = new HashMap<String,Object>(); - if(fullName!=null){ + private ResponseEntity<Map<String, Object>> getResponse(String ntAccount, String fullName) { + Map<String, Object> message = new HashMap<String, Object>(); + if (fullName != null) { message.put("ntAccount", ntAccount); message.put("fullName", fullName); message.put("status", 0); return new ResponseEntity<Map<String, Object>>(message, HttpStatus.OK); - }else { + } else { return new ResponseEntity<Map<String, Object>>(message, HttpStatus.NOT_FOUND); } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java index 5ad8893..f93ce6a 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java @@ -32,30 +32,30 @@ import java.util.Map; public class MeasureController { @Autowired - MeasureService measureService; + private MeasureService measureService; - @RequestMapping(value = "/measures",method = RequestMethod.GET) + @RequestMapping(value = "/measures", method = RequestMethod.GET) public Iterable<Measure> getAllAliveMeasures() { return measureService.getAllAliveMeasures(); } - @RequestMapping(value = "/measure/{id}",method = RequestMethod.GET) + @RequestMapping(value = "/measure/{id}", method = RequestMethod.GET) public Measure getMeasureById(@PathVariable("id") long id) { return measureService.getMeasureById(id); } - @RequestMapping(value = "/measure/{id}",method = RequestMethod.DELETE) + @RequestMapping(value = "/measure/{id}", method = RequestMethod.DELETE) public GriffinOperationMessage deleteMeasureById(@PathVariable("id") Long id) { return measureService.deleteMeasureById(id); } - @RequestMapping(value = "/measure",method = RequestMethod.PUT) + @RequestMapping(value = "/measure", method = RequestMethod.PUT) public GriffinOperationMessage updateMeasure(@RequestBody Measure measure) { return measureService.updateMeasure(measure); } - @RequestMapping(value = "/measures/owner/{owner}",method = RequestMethod.GET) - public List<Map<String, String>> getAllAliveMeasureNameIdByOwner(@PathVariable("owner") String owner){ + @RequestMapping(value = "/measures/owner/{owner}", method = RequestMethod.GET) + public List<Map<String, String>> getAllAliveMeasureNameIdByOwner(@PathVariable("owner") String owner) { return measureService.getAllAliveMeasureNameIdByOwner(owner); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java index 61b16b6..021af17 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java @@ -27,7 +27,6 @@ import org.apache.griffin.core.util.GriffinOperationMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; @@ -42,11 +41,10 @@ public class MeasureServiceImpl implements MeasureService { private static final Logger LOGGER = LoggerFactory.getLogger(MeasureServiceImpl.class); @Autowired - JobServiceImpl jobService; + private JobServiceImpl jobService; @Autowired private MeasureRepo measureRepo; - @Autowired - private SchedulerFactoryBean factory; + @Override public Iterable<Measure> getAllAliveMeasures() { return measureRepo.findByDeleted(false); @@ -57,12 +55,9 @@ public class MeasureServiceImpl implements MeasureService { return measureRepo.findOne(id); } - - - @Override public GriffinOperationMessage deleteMeasureById(Long measureId) { - if (measureRepo.exists(measureId) == false) { + if (!measureRepo.exists(measureId)) { return GriffinOperationMessage.RESOURCE_NOT_FOUND; } else { //pause all jobs related to the measure @@ -74,18 +69,23 @@ public class MeasureServiceImpl implements MeasureService { } } - @Override public GriffinOperationMessage createMeasure(Measure measure) { List<Measure> aliveMeasureList = measureRepo.findByNameAndDeleted(measure.getName(), false); if (aliveMeasureList.size() == 0) { - if (measureRepo.save(measure) != null) - return GriffinOperationMessage.CREATE_MEASURE_SUCCESS; - else { + try { + if (measureRepo.save(measure) != null) + return GriffinOperationMessage.CREATE_MEASURE_SUCCESS; + else { + return GriffinOperationMessage.CREATE_MEASURE_FAIL; + } + } catch (Exception e) { + LOGGER.info("Failed to create new measure {}.{}", measure.getName(), e.getMessage()); return GriffinOperationMessage.CREATE_MEASURE_FAIL; } + } else { - LOGGER.warn("Failed to create new measure " + measure.getName() + ", it already exists"); + LOGGER.info("Failed to create new measure {}, it already exists.", measure.getName()); return GriffinOperationMessage.CREATE_MEASURE_FAIL_DUPLICATE; } } @@ -93,7 +93,7 @@ public class MeasureServiceImpl implements MeasureService { @Override public List<Map<String, String>> getAllAliveMeasureNameIdByOwner(String owner) { List<Map<String, String>> res = new ArrayList<>(); - for(Measure measure: measureRepo.findByOwnerAndDeleted(owner, false)){ + for (Measure measure : measureRepo.findByOwnerAndDeleted(owner, false)) { HashMap<String, String> map = new HashMap<>(); map.put("name", measure.getName()); map.put("id", measure.getId().toString()); @@ -103,10 +103,16 @@ public class MeasureServiceImpl implements MeasureService { } public GriffinOperationMessage updateMeasure(@RequestBody Measure measure) { - if (measureRepo.exists(measure.getId()) == false) { + if (!measureRepo.exists(measure.getId())) { return GriffinOperationMessage.RESOURCE_NOT_FOUND; } else { - measureRepo.save(measure); + try { + measureRepo.save(measure); + } catch (Exception e) { + LOGGER.error("Failed to update measure. {}", e.getMessage()); + return GriffinOperationMessage.UPDATE_MEASURE_FAIL; + } + return GriffinOperationMessage.UPDATE_MEASURE_SUCCESS; } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/43f9dbf7/service/src/main/java/org/apache/griffin/core/measure/entity/AuditableEntity.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/AuditableEntity.java b/service/src/main/java/org/apache/griffin/core/measure/entity/AuditableEntity.java index 9639f9c..546bad9 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/AuditableEntity.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/AuditableEntity.java @@ -35,7 +35,7 @@ public abstract class AuditableEntity implements Serializable { // private static final long serialVersionUID = 1L; @Id - @GeneratedValue(strategy=GenerationType.AUTO) + @GeneratedValue(strategy = GenerationType.AUTO) private Long id; @JsonIgnore @@ -52,11 +52,21 @@ public abstract class AuditableEntity implements Serializable { this.id = id; } - public Timestamp getCreatedDate() { return createdDate; } - public void setCreatedDate(Timestamp createdDate) { this.createdDate = createdDate; } + public Timestamp getCreatedDate() { + return createdDate; + } + + public void setCreatedDate(Timestamp createdDate) { + this.createdDate = createdDate; + } - public Timestamp getModifiedDate() { return modifiedDate; } - public void setModifiedDate(Timestamp modifiedDate) { this.modifiedDate = modifiedDate; } + public Timestamp getModifiedDate() { + return modifiedDate; + } + + public void setModifiedDate(Timestamp modifiedDate) { + this.modifiedDate = modifiedDate; + } @Override public int hashCode() {
