Fix bugs about code style violation against Griffin rules Author: Eugene <[email protected]>
Closes #388 from toyboxman/wrap. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/3bbbcb32 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/3bbbcb32 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/3bbbcb32 Branch: refs/heads/master Commit: 3bbbcb32686bb691bdef9af71ef48685d04ea63f Parents: 10f8103 Author: Eugene <[email protected]> Authored: Thu Aug 16 18:57:19 2018 +0800 Committer: William Guo <[email protected]> Committed: Thu Aug 16 18:57:19 2018 +0800 ---------------------------------------------------------------------- .../griffin/core/GriffinWebApplication.java | 5 +- .../core/config/EclipseLinkJpaConfig.java | 10 +- .../apache/griffin/core/config/EnvConfig.java | 10 +- .../apache/griffin/core/config/LoginConfig.java | 3 +- .../griffin/core/config/PropertiesConfig.java | 32 ++-- .../griffin/core/config/SchedulerConfig.java | 6 +- .../core/exception/GriffinExceptionHandler.java | 19 +- .../core/exception/GriffinExceptionMessage.java | 37 +++- .../exception/GriffinExceptionResponse.java | 6 +- .../griffin/core/job/BatchJobOperatorImpl.java | 109 ++++++++---- .../griffin/core/job/FileExistPredicator.java | 12 +- .../apache/griffin/core/job/JobController.java | 29 ++- .../apache/griffin/core/job/JobInstance.java | 94 ++++++---- .../apache/griffin/core/job/JobOperator.java | 9 +- .../apache/griffin/core/job/JobServiceImpl.java | 175 +++++++++++++------ .../apache/griffin/core/job/SparkSubmitJob.java | 50 ++++-- .../core/job/StreamingJobOperatorImpl.java | 97 ++++++---- .../griffin/core/job/entity/AbstractJob.java | 29 ++- .../griffin/core/job/entity/BatchJob.java | 10 +- .../griffin/core/job/entity/JobDataSegment.java | 12 +- .../core/job/entity/JobInstanceBean.java | 12 +- .../griffin/core/job/entity/JobState.java | 3 +- .../core/job/entity/LivySessionStates.java | 15 +- .../core/job/entity/SegmentPredicate.java | 8 +- .../griffin/core/job/entity/StreamingJob.java | 3 +- .../factory/AutowiringSpringBeanJobFactory.java | 3 +- .../core/job/factory/PredicatorFactory.java | 3 +- .../griffin/core/job/repo/JobInstanceRepo.java | 9 +- .../apache/griffin/core/job/repo/JobRepo.java | 3 +- .../griffin/core/login/LoginController.java | 3 +- .../core/login/LoginServiceDefaultImpl.java | 2 +- .../core/login/LoginServiceLdapImpl.java | 24 ++- .../measure/ExternalMeasureOperatorImpl.java | 6 +- .../measure/GriffinMeasureOperatorImpl.java | 3 +- .../griffin/core/measure/MeasureController.java | 12 +- .../core/measure/MeasureOrgController.java | 4 +- .../griffin/core/measure/MeasureOrgService.java | 5 +- .../core/measure/MeasureOrgServiceImpl.java | 19 +- .../core/measure/MeasureServiceImpl.java | 27 ++- .../core/measure/entity/DataConnector.java | 32 ++-- .../griffin/core/measure/entity/DataSource.java | 8 +- .../core/measure/entity/EvaluateRule.java | 3 +- .../core/measure/entity/ExternalMeasure.java | 3 +- .../core/measure/entity/GriffinMeasure.java | 22 ++- .../griffin/core/measure/entity/Measure.java | 10 +- .../griffin/core/measure/entity/Rule.java | 25 ++- .../measure/entity/StreamingPreProcess.java | 5 +- .../metastore/hive/HiveMetaStoreController.java | 3 +- .../core/metastore/hive/HiveMetaStoreProxy.java | 13 +- .../hive/HiveMetaStoreServiceImpl.java | 32 ++-- .../metastore/kafka/KafkaSchemaController.java | 6 +- .../metastore/kafka/KafkaSchemaServiceImpl.java | 21 ++- .../griffin/core/metric/MetricController.java | 17 +- .../griffin/core/metric/MetricService.java | 3 +- .../griffin/core/metric/MetricServiceImpl.java | 48 +++-- .../apache/griffin/core/metric/MetricStore.java | 6 +- .../griffin/core/metric/MetricStoreImpl.java | 81 ++++++--- .../griffin/core/metric/model/Metric.java | 3 +- .../org/apache/griffin/core/util/FSUtil.java | 33 ++-- .../org/apache/griffin/core/util/FileUtil.java | 9 +- .../org/apache/griffin/core/util/JsonUtil.java | 24 ++- .../apache/griffin/core/util/MeasureUtil.java | 12 +- .../griffin/core/util/PropertiesUtil.java | 10 +- .../org/apache/griffin/core/util/TimeUtil.java | 23 ++- .../apache/griffin/core/util/YarnNetUtil.java | 7 +- .../config/EclipseLinkJpaConfigForTest.java | 11 +- .../core/config/PropertiesConfigTest.java | 8 +- .../core/info/GriffinInfoControllerTest.java | 2 +- .../griffin/core/job/JobControllerTest.java | 35 ++-- .../core/job/JobInstanceBeanRepoTest.java | 27 ++- .../griffin/core/job/JobInstanceTest.java | 44 +++-- .../griffin/core/job/SparkSubmitJobTest.java | 44 +++-- .../core/job/repo/JobInstanceRepoTest.java | 8 +- .../griffin/core/job/repo/JobRepoTest.java | 11 +- .../ExternalMeasureOperatorImplTest.java | 9 +- .../measure/GriffinMeasureOperatorImplTest.java | 12 +- .../core/measure/MeasureControllerTest.java | 36 ++-- .../core/measure/MeasureOrgControllerTest.java | 3 +- .../core/measure/MeasureOrgServiceImplTest.java | 23 ++- .../core/measure/MeasureServiceImplTest.java | 69 +++++--- .../measure/repo/DataConnectorRepoTest.java | 17 +- .../core/measure/repo/MeasureRepoTest.java | 3 +- .../hive/HiveMetaStoreControllerTest.java | 28 ++- .../hive/HiveMetaStoreServiceImplTest.java | 9 +- .../kafka/KafkaSchemaControllerTest.java | 12 +- .../core/metric/MetricControllerTest.java | 49 ++++-- .../core/metric/MetricServiceImplTest.java | 54 ++++-- .../core/metric/MetricStoreImplTest.java | 9 +- .../apache/griffin/core/util/EntityHelper.java | 115 +++++++----- .../apache/griffin/core/util/JsonUtilTest.java | 15 +- .../griffin/core/util/PropertiesUtilTest.java | 11 +- .../apache/griffin/core/util/TimeUtilTest.java | 8 +- 92 files changed, 1374 insertions(+), 675 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java b/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java index e5f877b..4ed4773 100644 --- a/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java +++ b/service/src/main/java/org/apache/griffin/core/GriffinWebApplication.java @@ -31,7 +31,8 @@ import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling public class GriffinWebApplication { - private static final Logger LOGGER = LoggerFactory.getLogger(GriffinWebApplication.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(GriffinWebApplication.class); public static void main(String[] args) { LOGGER.info("application start"); @@ -43,4 +44,4 @@ public class GriffinWebApplication { return new SimpleCORSFilter(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java b/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java index ffb013a..1493569 100644 --- a/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java +++ b/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java @@ -36,9 +36,10 @@ import org.springframework.transaction.jta.JtaTransactionManager; @Configuration @ComponentScan("org.apache.griffin.core") public class EclipseLinkJpaConfig extends JpaBaseConfiguration { - protected EclipseLinkJpaConfig(DataSource ds, JpaProperties properties, - ObjectProvider<JtaTransactionManager> jtm, - ObjectProvider<TransactionManagerCustomizers> tmc) { + protected EclipseLinkJpaConfig( + DataSource ds, JpaProperties properties, + ObjectProvider<JtaTransactionManager> jtm, + ObjectProvider<TransactionManagerCustomizers> tmc) { super(ds, properties, jtm, tmc); } @@ -51,7 +52,8 @@ public class EclipseLinkJpaConfig extends JpaBaseConfiguration { protected Map<String, Object> getVendorProperties() { Map<String, Object> map = new HashMap<>(); map.put(PersistenceUnitProperties.WEAVING, "false"); - map.put(PersistenceUnitProperties.DDL_GENERATION, "create-or-extend-tables"); + map.put(PersistenceUnitProperties.DDL_GENERATION, + "create-or-extend-tables"); return map; } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java b/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java index ebd5a46..8c075a4 100644 --- a/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java +++ b/service/src/main/java/org/apache/griffin/core/config/EnvConfig.java @@ -74,7 +74,8 @@ public class EnvConfig { * @return String * @throws IOException io exception */ - private static String readEnvFromAbsolutePath(String path) throws IOException { + private static String readEnvFromAbsolutePath(String path) + throws IOException { if (path == null) { LOGGER.warn("Parameter path is null."); return null; @@ -101,7 +102,8 @@ public class EnvConfig { * @return String * @throws IOException io exception */ - static String getBatchEnv(String name, String defaultPath, String location) throws IOException { + static String getBatchEnv(String name, String defaultPath, String location) + throws IOException { if (ENV_BATCH != null) { return ENV_BATCH; } @@ -116,7 +118,9 @@ public class EnvConfig { return ENV_BATCH; } - static String getStreamingEnv(String name, String defaultPath, String location) + static String getStreamingEnv(String name, + String defaultPath, + String location) throws IOException { if (ENV_STREAMING != null) { return ENV_STREAMING; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java b/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java index 57066a8..ca43751 100644 --- a/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java +++ b/service/src/main/java/org/apache/griffin/core/config/LoginConfig.java @@ -47,7 +47,8 @@ public class LoginConfig { case "default": return new LoginServiceDefaultImpl(); case "ldap": - return new LoginServiceLdapImpl(url, email, searchBase, searchPattern); + return new LoginServiceLdapImpl(url, email, searchBase, + searchPattern); default: return null; } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java index 16e5525..3bdb02b 100644 --- a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java +++ b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java @@ -45,7 +45,8 @@ import org.springframework.core.io.ClassPathResource; @Configuration public class PropertiesConfig { - private static final Logger LOGGER = LoggerFactory.getLogger(PropertiesConfig.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(PropertiesConfig.class); public static Map<String, Object> livyConfMap; @@ -53,9 +54,13 @@ public class PropertiesConfig { private String envLocation; - public PropertiesConfig(@Value("${external.config.location}") String configLocation, @Value("${external.env.location}") String envLocation) { - LOGGER.info("external.config.location : {}", configLocation != null ? configLocation : "null"); - LOGGER.info("external.env.location : {}", envLocation != null ? envLocation : "null"); + public PropertiesConfig( + @Value("${external.config.location}") String configLocation, + @Value("${external.env .location}") String envLocation) { + LOGGER.info("external.config.location : {}", + configLocation != null ? configLocation : "null"); + LOGGER.info("external.env.location : {}", + envLocation != null ? envLocation : "null"); this.configLocation = configLocation; this.envLocation = envLocation; } @@ -73,10 +78,12 @@ public class PropertiesConfig { } /** - * Config quartz.properties will be replaced if it's found in external.config.location setting. + * Config quartz.properties will be replaced if it's found in external + * .config.location setting. * * @return Properties - * @throws FileNotFoundException It'll throw FileNotFoundException when path is wrong. + * @throws FileNotFoundException It'll throw FileNotFoundException + * when path is wrong. */ @Bean(name = "quartzConf") public Properties quartzConf() throws FileNotFoundException { @@ -85,7 +92,10 @@ public class PropertiesConfig { return getConf(name, defaultPath, configLocation); } - private static void genLivyConf(String name, String defaultPath, String location) throws IOException { + private static void genLivyConf( + String name, + String defaultPath, + String location) throws IOException { if (livyConfMap != null) { return; } @@ -106,13 +116,15 @@ public class PropertiesConfig { * @return Map * @throws IOException io exception */ - private static Map<String, Object> readPropertiesFromResource(String path) throws IOException { + private static Map<String, Object> readPropertiesFromResource(String path) + throws IOException { if (path == null) { LOGGER.warn("Parameter path is null."); return null; } - // Be careful, here we use getInputStream() to convert path file to stream. - // It'll cause FileNotFoundException if you use getFile() to convert path file to File Object + // Be careful, here we use getInputStream() to convert path file to + // stream. It'll cause FileNotFoundException if you use getFile() + // to convert path file to File Object InputStream in = new ClassPathResource(path).getInputStream(); return toEntity(in, new TypeReference<Map<String, Object>>() { }); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java b/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java index 8c71e89..7b6af51 100644 --- a/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java +++ b/service/src/main/java/org/apache/griffin/core/config/SchedulerConfig.java @@ -43,13 +43,15 @@ public class SchedulerConfig { @Bean public JobFactory jobFactory(ApplicationContext applicationContext) { - AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory(); + AutowiringSpringBeanJobFactory jobFactory = + new AutowiringSpringBeanJobFactory(); jobFactory.setApplicationContext(applicationContext); return jobFactory; } @Bean - public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource, JobFactory jobFactory) { + public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource, + JobFactory jobFactory) { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setOverwriteExistingJobs(true); factory.setDataSource(dataSource); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java index 0a105b4..d987da7 100644 --- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java +++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionHandler.java @@ -33,22 +33,29 @@ public class GriffinExceptionHandler { @SuppressWarnings("rawtypes") @ExceptionHandler(GriffinException.ServiceException.class) - public ResponseEntity handleGriffinExceptionOfServer(HttpServletRequest request, GriffinException.ServiceException e) { + public ResponseEntity handleGriffinExceptionOfServer( + HttpServletRequest request, + GriffinException.ServiceException e) { String message = e.getMessage(); Throwable cause = e.getCause(); - GriffinExceptionResponse body = new GriffinExceptionResponse(HttpStatus.INTERNAL_SERVER_ERROR, + GriffinExceptionResponse body = new GriffinExceptionResponse( + HttpStatus.INTERNAL_SERVER_ERROR, message, request.getRequestURI(), cause.getClass().getName()); return new ResponseEntity<>(body, HttpStatus.INTERNAL_SERVER_ERROR); } @SuppressWarnings("rawtypes") @ExceptionHandler(GriffinException.class) - public ResponseEntity handleGriffinExceptionOfClient(HttpServletRequest request, GriffinException e) { - ResponseStatus responseStatus = AnnotationUtils.findAnnotation(e.getClass(), ResponseStatus.class); + public ResponseEntity handleGriffinExceptionOfClient( + HttpServletRequest request, GriffinException e) { + ResponseStatus responseStatus = AnnotationUtils.findAnnotation( + e.getClass(), ResponseStatus.class); HttpStatus status = responseStatus.code(); String code = e.getMessage(); - GriffinExceptionMessage message = GriffinExceptionMessage.valueOf(Integer.valueOf(code)); - GriffinExceptionResponse body = new GriffinExceptionResponse(status, message, request.getRequestURI()); + GriffinExceptionMessage message = GriffinExceptionMessage + .valueOf(Integer.valueOf(code)); + GriffinExceptionResponse body = new GriffinExceptionResponse( + status, message, request.getRequestURI()); return new ResponseEntity<>(body, status); } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java index 14d987d..ae6a0ea 100644 --- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java +++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java @@ -22,35 +22,57 @@ package org.apache.griffin.core.exception; public enum GriffinExceptionMessage { //400, "Bad Request" - MEASURE_TYPE_DOES_NOT_MATCH(40001, "Property 'measure.type' does not match the type of measure in request body"), - INVALID_CONNECTOR_NAME(40002, "Property 'name' in 'connectors' field is invalid"), + MEASURE_TYPE_DOES_NOT_MATCH(40001, "Property 'measure.type' does not match" + + "the type of measure in request body"), + INVALID_CONNECTOR_NAME(40002, "Property 'name' in 'connectors' " + + "field is invalid"), MISSING_METRIC_NAME(40003, "Missing property 'metricName'"), INVALID_JOB_NAME(40004, "Property 'job.name' is invalid"), MISSING_BASELINE_CONFIG(40005, "Missing 'as.baseline' config in 'data.segments'"), + INVALID_METRIC_RECORDS_OFFSET(40006, "Offset must not be less than zero"), + INVALID_METRIC_RECORDS_SIZE(40007, "Size must not be less than zero"), + INVALID_METRIC_VALUE_FORMAT(40008, "Metric value format is invalid"), + INVALID_MEASURE_ID(40009, "Property 'measure.id' is invalid"), + INVALID_CRON_EXPRESSION(40010, "Property 'cron.expression' is invalid"), + MEASURE_TYPE_DOES_NOT_SUPPORT(40011, "We don't support such measure type."), + JOB_TYPE_DOES_NOT_SUPPORT(40011, "We don't support such job type."), - STREAMING_JOB_IS_RUNNING(40012, "There is no need to start again as job is RUNNING."), - STREAMING_JOB_IS_STOPPED(40012, "There is no need to stop again as job is STOPPED."), + + STREAMING_JOB_IS_RUNNING(40012, "There is no need to start again " + + "as job is RUNNING."), + STREAMING_JOB_IS_STOPPED(40012, "There is no need to stop again " + + "as job is STOPPED."), JOB_IS_NOT_SCHEDULED(40013, "The job isn't scheduled."), + JOB_IS_NOT_IN_PAUSED_STATUS(40014, "The job isn't in paused status."), + JOB_IS_IN_PAUSED_STATUS(40015, "The job is already in paused status."), //404, "Not Found" MEASURE_ID_DOES_NOT_EXIST(40401, "Measure id does not exist"), + JOB_ID_DOES_NOT_EXIST(40402, "Job id does not exist"), + JOB_NAME_DOES_NOT_EXIST(40403, "Job name does not exist"), + NO_SUCH_JOB_ACTION(40404, "No such job action"), - JOB_KEY_DOES_NOT_EXIST(40405, "Job key which consists of group and name does not exist."), - ORGANIZATION_NAME_DOES_NOT_EXIST(40406, "Organization name does not exist"), + + JOB_KEY_DOES_NOT_EXIST(40405, "Job key which consists of " + + "group and name does not exist."), + ORGANIZATION_NAME_DOES_NOT_EXIST(40406, "Organization name " + + "does not exist"), + HDFS_FILE_NOT_EXIST(40407, "Hadoop data file not exist"), //409, "Conflict" MEASURE_NAME_ALREADY_EXIST(40901, "Measure name already exists"), + QUARTZ_JOB_ALREADY_EXIST(40902, "Quartz job already exist"); private final int code; @@ -70,7 +92,8 @@ public enum GriffinExceptionMessage { return message; } } - throw new IllegalArgumentException("No matching constant for [" + code + "]"); + throw new IllegalArgumentException("No matching constant for [" + + code + "]"); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java index db8f766..47e28da 100644 --- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java +++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionResponse.java @@ -37,7 +37,8 @@ public class GriffinExceptionResponse { private String path; - GriffinExceptionResponse(HttpStatus status, GriffinExceptionMessage message, String path) { + GriffinExceptionResponse(HttpStatus status, GriffinExceptionMessage message, + String path) { this.status = status.value(); this.error = status.getReasonPhrase(); this.code = Integer.toString(message.getCode()); @@ -45,7 +46,8 @@ public class GriffinExceptionResponse { this.path = path; } - GriffinExceptionResponse(HttpStatus status, String message, String path, String exception) { + GriffinExceptionResponse(HttpStatus status, String message, String path, + String exception) { this.status = status.value(); this.error = status.getReasonPhrase(); this.message = message; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java index bc73cd8..5b2f816 100644 --- a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java +++ b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java @@ -51,7 +51,8 @@ import org.springframework.util.StringUtils; @Service public class BatchJobOperatorImpl implements JobOperator { - private static final Logger LOGGER = LoggerFactory.getLogger(BatchJobOperatorImpl.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(BatchJobOperatorImpl.class); @Autowired private SchedulerFactoryBean factory; @@ -64,7 +65,8 @@ public class BatchJobOperatorImpl implements JobOperator { @Override @Transactional(rollbackFor = Exception.class) - public AbstractJob add(AbstractJob job, GriffinMeasure measure) throws Exception { + public AbstractJob add(AbstractJob job, GriffinMeasure measure) + throws Exception { validateParams(job, measure); String qName = jobService.getQuartzName(job); String qGroup = jobService.getQuartzGroup(); @@ -75,7 +77,9 @@ public class BatchJobOperatorImpl implements JobOperator { return job; } - private BatchJob genBatchJobBean(AbstractJob job, String qName, String qGroup) { + private BatchJob genBatchJobBean(AbstractJob job, + String qName, + String qGroup) { BatchJob batchJob = (BatchJob) job; batchJob.setMetricName(job.getJobName()); batchJob.setGroup(qGroup); @@ -96,17 +100,21 @@ public class BatchJobOperatorImpl implements JobOperator { String group = job.getGroup(); TriggerState state = getTriggerState(name, group); if (state == null) { - throw new GriffinException.BadRequestException(JOB_IS_NOT_SCHEDULED); + throw new GriffinException.BadRequestException( + JOB_IS_NOT_SCHEDULED); } - /* If job is not in paused state,we can't start it as it may be RUNNING. */ + /* If job is not in paused state,we can't start it + as it may be RUNNING.*/ if (state != PAUSED) { - throw new GriffinException.BadRequestException(JOB_IS_NOT_IN_PAUSED_STATUS); + throw new GriffinException.BadRequestException + (JOB_IS_NOT_IN_PAUSED_STATUS); } JobKey jobKey = jobKey(name, group); try { factory.getScheduler().resumeJob(jobKey); } catch (SchedulerException e) { - throw new GriffinException.ServiceException("Failed to start job.", e); + throw new GriffinException.ServiceException( + "Failed to start job.", e); } } @@ -123,19 +131,23 @@ public class BatchJobOperatorImpl implements JobOperator { @Override - public JobHealth getHealth(JobHealth jobHealth, AbstractJob job) throws SchedulerException { - List<? extends Trigger> triggers = jobService.getTriggers(job.getName(), job.getGroup()); + public JobHealth getHealth(JobHealth jobHealth, AbstractJob job) + throws SchedulerException { + List<? extends Trigger> triggers = jobService + .getTriggers(job.getName(), job.getGroup()); if (!CollectionUtils.isEmpty(triggers)) { jobHealth.setJobCount(jobHealth.getJobCount() + 1); if (jobService.isJobHealthy(job.getId())) { - jobHealth.setHealthyJobCount(jobHealth.getHealthyJobCount() + 1); + jobHealth.setHealthyJobCount( + jobHealth.getHealthyJobCount() + 1); } } return jobHealth; } @Override - public JobState getState(AbstractJob job, String action) throws SchedulerException { + public JobState getState(AbstractJob job, String action) + throws SchedulerException { JobState jobState = new JobState(); Scheduler scheduler = factory.getScheduler(); if (job.getGroup() == null || job.getName() == null) { @@ -150,24 +162,30 @@ public class BatchJobOperatorImpl implements JobOperator { return jobState; } - private void setTriggerTime(AbstractJob job, JobState jobState) throws SchedulerException { - List<? extends Trigger> triggers = jobService.getTriggers(job.getName(), job.getGroup()); - // If triggers are empty, in Griffin it means job is completed whose trigger state is NONE or not scheduled. + private void setTriggerTime(AbstractJob job, JobState jobState) + throws SchedulerException { + List<? extends Trigger> triggers = jobService + .getTriggers(job.getName(), job.getGroup()); + // If triggers are empty, in Griffin it means job is completed whose + // trigger state is NONE or not scheduled. if (CollectionUtils.isEmpty(triggers)) { return; } Trigger trigger = triggers.get(0); Date nextFireTime = trigger.getNextFireTime(); Date previousFireTime = trigger.getPreviousFireTime(); - jobState.setNextFireTime(nextFireTime != null ? nextFireTime.getTime() : -1); - jobState.setPreviousFireTime(previousFireTime != null ? previousFireTime.getTime() : -1); + jobState.setNextFireTime(nextFireTime != null ? + nextFireTime.getTime() : -1); + jobState.setPreviousFireTime(previousFireTime != null ? + previousFireTime.getTime() : -1); } /** * only PAUSED state of job can be started * * @param state job state - * @return true: job can be started, false: job is running which cannot be started + * @return true: job can be started, false: job is running which cannot be + * started */ private boolean getStartStatus(TriggerState state) { return state == PAUSED; @@ -177,7 +195,8 @@ public class BatchJobOperatorImpl implements JobOperator { * only NORMAL or BLOCKED state of job can be started * * @param state job state - * @return true: job can be stopped, false: job is running which cannot be stopped + * @return true: job can be stopped, false: job is running which cannot be + * stopped */ private boolean getStopStatus(TriggerState state) { return state == NORMAL || state == BLOCKED; @@ -186,7 +205,8 @@ public class BatchJobOperatorImpl implements JobOperator { private TriggerState getTriggerState(String name, String group) { try { - List<? extends Trigger> triggers = jobService.getTriggers(name, group); + List<? extends Trigger> triggers = jobService.getTriggers(name, + group); if (CollectionUtils.isEmpty(triggers)) { return null; } @@ -194,7 +214,8 @@ public class BatchJobOperatorImpl implements JobOperator { return factory.getScheduler().getTriggerState(key); } catch (SchedulerException e) { LOGGER.error("Failed to delete job", e); - throw new GriffinException.ServiceException("Failed to delete job", e); + throw new GriffinException + .ServiceException("Failed to delete job", e); } } @@ -202,7 +223,8 @@ public class BatchJobOperatorImpl implements JobOperator { /** * @param job griffin job - * @param delete if job needs to be deleted,set isNeedDelete true,otherwise it just will be paused. + * @param delete if job needs to be deleted,set isNeedDelete true,otherwise + * it just will be paused. */ private void pauseJob(BatchJob job, boolean delete) { try { @@ -212,7 +234,8 @@ public class BatchJobOperatorImpl implements JobOperator { batchJobRepo.save(job); } catch (Exception e) { LOGGER.error("Job schedule happens exception.", e); - throw new GriffinException.ServiceException("Job schedule happens exception.", e); + throw new GriffinException.ServiceException("Job schedule " + + "happens exception.", e); } } @@ -220,7 +243,8 @@ public class BatchJobOperatorImpl implements JobOperator { List<JobInstanceBean> instances = instanceRepo.findByJobId(job.getId()); for (JobInstanceBean instance : instances) { if (!instance.isPredicateDeleted()) { - deleteJob(instance.getPredicateGroup(), instance.getPredicateName()); + deleteJob(instance.getPredicateGroup(), instance + .getPredicateName()); instance.setPredicateDeleted(true); if (instance.getState().equals(LivySessionStates.State.FINDING)) { instance.setState(LivySessionStates.State.NOT_FOUND); @@ -233,7 +257,8 @@ public class BatchJobOperatorImpl implements JobOperator { Scheduler scheduler = factory.getScheduler(); JobKey jobKey = new JobKey(name, group); if (!scheduler.checkExists(jobKey)) { - LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName()); + LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), jobKey + .getName()); return; } scheduler.deleteJob(jobKey); @@ -247,8 +272,10 @@ public class BatchJobOperatorImpl implements JobOperator { Scheduler scheduler = factory.getScheduler(); JobKey jobKey = new JobKey(name, group); if (!scheduler.checkExists(jobKey)) { - LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName()); - throw new GriffinException.NotFoundException(JOB_KEY_DOES_NOT_EXIST); + LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey + .getName()); + throw new GriffinException.NotFoundException + (JOB_KEY_DOES_NOT_EXIST); } scheduler.pauseJob(jobKey); } @@ -267,7 +294,8 @@ public class BatchJobOperatorImpl implements JobOperator { return pauseStatus; } - private boolean pauseJobInstance(JobInstanceBean instance, List<JobInstanceBean> deletedInstances) { + private boolean pauseJobInstance(JobInstanceBean instance, + List<JobInstanceBean> deletedInstances) { boolean status = true; String pGroup = instance.getPredicateGroup(); String pName = instance.getPredicateName(); @@ -278,7 +306,8 @@ public class BatchJobOperatorImpl implements JobOperator { deletedInstances.add(instance); } } catch (SchedulerException e) { - LOGGER.error("Failed to pause predicate job({},{}).", pGroup, pName); + LOGGER.error("Failed to pause predicate job({},{}).", pGroup, + pName); status = false; } return status; @@ -289,14 +318,17 @@ public class BatchJobOperatorImpl implements JobOperator { throw new GriffinException.BadRequestException(INVALID_JOB_NAME); } if (!isValidCronExpression(job.getCronExpression())) { - throw new GriffinException.BadRequestException(INVALID_CRON_EXPRESSION); + throw new GriffinException.BadRequestException + (INVALID_CRON_EXPRESSION); } if (!isValidBaseLine(job.getSegments())) { - throw new GriffinException.BadRequestException(MISSING_BASELINE_CONFIG); + throw new GriffinException.BadRequestException + (MISSING_BASELINE_CONFIG); } List<String> names = getConnectorNames(measure); if (!isValidConnectorNames(job.getSegments(), names)) { - throw new GriffinException.BadRequestException(INVALID_CONNECTOR_NAME); + throw new GriffinException.BadRequestException + (INVALID_CONNECTOR_NAME); } } @@ -319,24 +351,29 @@ public class BatchJobOperatorImpl implements JobOperator { return true; } } - LOGGER.warn("Please set segment timestamp baseline in as.baseline field."); + LOGGER.warn("Please set segment timestamp baseline " + + "in as.baseline field."); return false; } - private boolean isValidConnectorNames(List<JobDataSegment> segments, List<String> names) { + private boolean isValidConnectorNames(List<JobDataSegment> segments, + List<String> names) { assert segments != null; Set<String> sets = new HashSet<>(); for (JobDataSegment segment : segments) { String dcName = segment.getDataConnectorName(); sets.add(dcName); - boolean exist = names.stream().anyMatch(name -> name.equals(dcName)); + boolean exist = names.stream().anyMatch(name -> name.equals + (dcName)); if (!exist) { - LOGGER.warn("Param {} is a illegal string. Please input one of strings in {}.", dcName, names); + LOGGER.warn("Param {} is a illegal string. " + + "Please input one of strings in {}.", dcName, names); return false; } } if (sets.size() < segments.size()) { - LOGGER.warn("Connector names in job data segment cannot duplicate."); + LOGGER.warn("Connector names in job data segment " + + "cannot duplicate."); return false; } return true; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java index 35f981f..703a837 100644 --- a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java +++ b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java @@ -32,7 +32,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class FileExistPredicator implements Predicator { - private static final Logger LOGGER = LoggerFactory.getLogger(FileExistPredicator.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(FileExistPredicator.class); private static final String PREDICT_PATH = "path"; private static final String PREDICT_ROOT_PATH = "root.path"; @@ -48,12 +49,15 @@ public class FileExistPredicator implements Predicator { Map<String, Object> config = predicate.getConfigMap(); String[] paths = null; String rootPath = null; - if (config != null && !StringUtils.isEmpty((String) config.get(PREDICT_PATH))) { - paths = ((String) config.get(PREDICT_PATH)).split(PATH_CONNECTOR_CHARACTER); + if (config != null && !StringUtils.isEmpty((String) config.get + (PREDICT_PATH))) { + paths = ((String) config.get(PREDICT_PATH)).split + (PATH_CONNECTOR_CHARACTER); rootPath = (String) config.get(PREDICT_ROOT_PATH); } if (ArrayUtils.isEmpty(paths) || StringUtils.isEmpty(rootPath)) { - LOGGER.error("Predicate path is null.Please check predicates config root.path and path."); + LOGGER.error("Predicate path is null.Please check predicates " + + "config root.path and path."); throw new NullPointerException(); } for (String path : paths) { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/JobController.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java index 64b8e42..f4ee791 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobController.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java @@ -48,7 +48,8 @@ public class JobController { private JobService jobService; @RequestMapping(value = "/jobs", method = RequestMethod.GET) - public List<AbstractJob> getJobs(@RequestParam(value = "type", defaultValue = "") String type) { + public List<AbstractJob> getJobs(@RequestParam(value = "type", + defaultValue = "") String type) { return jobService.getAliveJobs(type); } @@ -65,24 +66,31 @@ public class JobController { @RequestMapping(value = "/jobs/{id}", method = RequestMethod.PUT) @ResponseStatus(HttpStatus.OK) - public AbstractJob onActions(@PathVariable("id") Long jobId, @RequestParam String action) throws Exception { + public AbstractJob onActions( + @PathVariable("id") Long jobId, + @RequestParam String action) throws Exception { return jobService.onAction(jobId, action); } @RequestMapping(value = "/jobs", method = RequestMethod.DELETE) @ResponseStatus(HttpStatus.NO_CONTENT) - public void deleteJob(@RequestParam("jobName") String jobName) throws SchedulerException { + public void deleteJob(@RequestParam("jobName") String jobName) + throws SchedulerException { jobService.deleteJob(jobName); } @RequestMapping(value = "/jobs/{id}", method = RequestMethod.DELETE) @ResponseStatus(HttpStatus.NO_CONTENT) - public void deleteJob(@PathVariable("id") Long id) throws SchedulerException { + public void deleteJob(@PathVariable("id") Long id) + throws SchedulerException { jobService.deleteJob(id); } @RequestMapping(value = "/jobs/instances", method = RequestMethod.GET) - public List<JobInstanceBean> findInstancesOfJob(@RequestParam("jobId") Long id, @RequestParam("page") int page, @RequestParam("size") int size) { + public List<JobInstanceBean> findInstancesOfJob( + @RequestParam("jobId") Long id, + @RequestParam("page") int page, + @RequestParam("size") int size) { return jobService.findInstancesOfJob(id, page, size); } @@ -92,11 +100,16 @@ public class JobController { } @RequestMapping(path = "/jobs/download", method = RequestMethod.GET) - public ResponseEntity<Resource> download(@RequestParam("jobName") String jobName, @RequestParam("ts") long timestamp) throws Exception { + public ResponseEntity<Resource> download( + @RequestParam("jobName") String jobName, + @RequestParam("ts") long timestamp) + throws Exception { String path = jobService.getJobHdfsSinksPath(jobName, timestamp); - InputStreamResource resource = new InputStreamResource(FSUtil.getMissSampleInputStream(path)); + InputStreamResource resource = new InputStreamResource( + FSUtil.getMissSampleInputStream(path)); return ResponseEntity.ok(). - header("content-disposition", "attachment; filename = sampleMissingData.json") + header("content-disposition", + "attachment; filename = sampleMissingData.json") .contentType(MediaType.APPLICATION_OCTET_STREAM) .body(resource); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/JobInstance.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java index ab67c81..afb25d8 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java @@ -78,7 +78,8 @@ import org.springframework.transaction.annotation.Transactional; @PersistJobDataAfterExecution @DisallowConcurrentExecution public class JobInstance implements Job { - private static final Logger LOGGER = LoggerFactory.getLogger(JobInstance.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(JobInstance.class); public static final String MEASURE_KEY = "measure"; public static final String PREDICATES_KEY = "predicts"; public static final String PREDICATE_JOB_NAME = "predicateJobName"; @@ -86,7 +87,8 @@ public class JobInstance implements Job { static final String PATH_CONNECTOR_CHARACTER = ","; public static final String INTERVAL = "interval"; public static final String REPEAT = "repeat"; - public static final String CHECK_DONEFILE_SCHEDULE = "checkdonefile.schedule"; + public static final String CHECK_DONEFILE_SCHEDULE = + "checkdonefile.schedule"; @Autowired private SchedulerFactoryBean factory; @@ -104,7 +106,6 @@ public class JobInstance implements Job { private List<SegmentPredicate> mPredicates; private Long jobStartTime; - @Override @Transactional public void execute(JobExecutionContext context) { @@ -117,7 +118,8 @@ public class JobInstance implements Job { } } - private void initParam(JobExecutionContext context) throws SchedulerException { + private void initParam(JobExecutionContext context) + throws SchedulerException { mPredicates = new ArrayList<>(); JobDetail jobDetail = context.getJobDetail(); Long jobId = jobDetail.getJobDataMap().getLong(GRIFFIN_JOB_ID); @@ -128,20 +130,24 @@ public class JobInstance implements Job { } @SuppressWarnings("unchecked") - private void setJobStartTime(JobDetail jobDetail) throws SchedulerException { + private void setJobStartTime(JobDetail jobDetail) + throws SchedulerException { Scheduler scheduler = factory.getScheduler(); JobKey jobKey = jobDetail.getKey(); - List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey); + List<Trigger> triggers = + (List<Trigger>) scheduler.getTriggersOfJob(jobKey); Date triggerTime = triggers.get(0).getPreviousFireTime(); jobStartTime = triggerTime.getTime(); } - private void setSourcesPartitionsAndPredicates(List<DataSource> sources) throws Exception { + private void setSourcesPartitionsAndPredicates(List<DataSource> sources) + throws Exception { boolean isFirstBaseline = true; for (JobDataSegment jds : job.getSegments()) { if (jds.isAsTsBaseline() && isFirstBaseline) { - Long tsOffset = TimeUtil.str2Long(jds.getSegmentRange().getBegin()); + Long tsOffset = TimeUtil.str2Long( + jds.getSegmentRange().getBegin()); measure.setTimestamp(jobStartTime + tsOffset); isFirstBaseline = false; } @@ -151,7 +157,8 @@ public class JobInstance implements Job { } } - private void setDataSourcePartitions(JobDataSegment jds, DataSource ds) throws Exception { + private void setDataSourcePartitions(JobDataSegment jds, DataSource ds) + throws Exception { List<DataConnector> connectors = ds.getConnectors(); for (DataConnector dc : connectors) { setDataConnectorPartitions(jds, dc); @@ -159,7 +166,9 @@ public class JobInstance implements Job { } - private void setDataConnectorPartitions(JobDataSegment jds, DataConnector dc) throws Exception { + private void setDataConnectorPartitions( + JobDataSegment jds, + DataConnector dc) throws Exception { String dcName = jds.getDataConnectorName(); if (dcName.equals(dc.getName())) { Long[] sampleTs = genSampleTs(jds.getSegmentRange(), dc); @@ -179,7 +188,8 @@ public class JobInstance implements Job { Long offset = TimeUtil.str2Long(segRange.getBegin()); Long range = TimeUtil.str2Long(segRange.getLength()); String unit = dc.getDataUnit(); - Long dataUnit = TimeUtil.str2Long(StringUtils.isEmpty(unit) ? dc.getDefaultDataUnit() : unit); + Long dataUnit = TimeUtil.str2Long(StringUtils.isEmpty(unit) ? dc + .getDefaultDataUnit() : unit); //offset usually is negative Long dataStartTime = jobStartTime + offset; if (range < 0) { @@ -206,7 +216,9 @@ public class JobInstance implements Job { private void setConnectorPredicates(DataConnector dc, Long[] sampleTs) { List<SegmentPredicate> predicates = dc.getPredicates(); for (SegmentPredicate predicate : predicates) { - genConfMap(predicate.getConfigMap(), sampleTs, dc.getDataTimeZone()); + genConfMap(predicate.getConfigMap(), + sampleTs, + dc.getDataTimeZone()); //Do not forget to update origin string config predicate.setConfigMap(predicate.getConfigMap()); mPredicates.add(predicate); @@ -222,11 +234,15 @@ public class JobInstance implements Job { /** * @param conf config map * @param sampleTs collection of data split start timestamp - * @return all config data combine,like {"where": "year=2017 AND month=11 AND dt=15 AND hour=09,year=2017 AND month=11 AND dt=15 AND hour=10"} - * or like {"path": "/year=2017/month=11/dt=15/hour=09/_DONE,/year=2017/month=11/dt=15/hour=10/_DONE"} + * @return all config data combine,like {"where": "year=2017 AND month=11 + * AND dt=15 AND hour=09,year=2017 AND month=11 AND + * dt=15 AND hour=10"} + * or like {"path": "/year=2017/month=11/dt=15/hour=09/_DONE + * ,/year=2017/month=11/dt=15/hour=10/_DONE"} */ - private void genConfMap(Map<String, Object> conf, Long[] sampleTs, String timezone) { + private void genConfMap(Map<String, Object> conf, Long[] sampleTs, String + timezone) { if (conf == null) { LOGGER.warn("Predicate config is null."); return; @@ -240,9 +256,11 @@ public class JobInstance implements Job { continue; } for (Long timestamp : sampleTs) { - set.add(TimeUtil.format(value, timestamp, getTimeZone(timezone))); + set.add(TimeUtil.format(value, timestamp, + getTimeZone(timezone))); } - conf.put(entry.getKey(), StringUtils.join(set, PATH_CONNECTOR_CHARACTER)); + conf.put(entry.getKey(), StringUtils.join(set, + PATH_CONNECTOR_CHARACTER)); } } } @@ -255,16 +273,20 @@ public class JobInstance implements Job { } @SuppressWarnings("unchecked") - private void createJobInstance(Map<String, Object> confMap) throws Exception { + private void createJobInstance(Map<String, Object> confMap) + throws Exception { confMap = checkConfMap(confMap != null ? confMap : new HashMap<>()); - Map<String, Object> config = (Map<String, Object>) confMap.get(CHECK_DONEFILE_SCHEDULE); + Map<String, Object> config = (Map<String, Object>) confMap + .get(CHECK_DONEFILE_SCHEDULE); Long interval = TimeUtil.str2Long((String) config.get(INTERVAL)); Integer repeat = Integer.valueOf(config.get(REPEAT).toString()); String groupName = "PG"; - String jobName = job.getJobName() + "_predicate_" + System.currentTimeMillis(); + String jobName = job.getJobName() + "_predicate_" + System + .currentTimeMillis(); TriggerKey tk = triggerKey(jobName, groupName); if (factory.getScheduler().checkExists(tk)) { - throw new GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST); + throw new GriffinException.ConflictException + (QUARTZ_JOB_ALREADY_EXIST); } saveJobInstance(jobName, groupName); createJobInstance(tk, interval, repeat, jobName); @@ -272,7 +294,8 @@ public class JobInstance implements Job { @SuppressWarnings("unchecked") Map<String, Object> checkConfMap(Map<String, Object> confMap) { - Map<String, Object> config = (Map<String, Object>) confMap.get(CHECK_DONEFILE_SCHEDULE); + Map<String, Object> config = (Map<String, Object>) confMap.get + (CHECK_DONEFILE_SCHEDULE); String interval = env.getProperty("predicate.job.interval"); interval = interval != null ? interval : "5m"; String repeat = env.getProperty("predicate.job.repeat.count"); @@ -294,30 +317,38 @@ public class JobInstance implements Job { } private void saveJobInstance(String pName, String pGroup) { - ProcessType type = measure.getProcessType() == BATCH ? BATCH : STREAMING; + ProcessType type = measure.getProcessType() == BATCH ? BATCH : + STREAMING; Long tms = System.currentTimeMillis(); String expired = env.getProperty("jobInstance.expired.milliseconds"); - Long expireTms = Long.valueOf(expired != null ? expired : "604800000") + tms; - JobInstanceBean instance = new JobInstanceBean(FINDING, pName, pGroup, tms, expireTms, type); + Long expireTms = Long.valueOf(expired != null ? expired : "604800000") + + tms; + JobInstanceBean instance = new JobInstanceBean(FINDING, pName, pGroup, + tms, expireTms, type); instance.setJob(job); instanceRepo.save(instance); } - private void createJobInstance(TriggerKey tk, Long interval, Integer repeatCount, String pJobName) throws Exception { + private void createJobInstance(TriggerKey tk, Long interval, Integer + repeatCount, String pJobName) throws Exception { JobDetail jobDetail = addJobDetail(tk, pJobName); - Trigger trigger = genTriggerInstance(tk, jobDetail, interval, repeatCount); + Trigger trigger = genTriggerInstance(tk, jobDetail, interval, + repeatCount); factory.getScheduler().scheduleJob(trigger); } - private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, Long interval, Integer repeatCount) { + private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, Long + interval, Integer repeatCount) { return newTrigger().withIdentity(tk).forJob(jd).startNow() - .withSchedule(simpleSchedule().withIntervalInMilliseconds(interval).withRepeatCount(repeatCount)) + .withSchedule(simpleSchedule().withIntervalInMilliseconds + (interval).withRepeatCount(repeatCount)) .build(); } - private JobDetail addJobDetail(TriggerKey tk, String pJobName) throws SchedulerException, IOException { + private JobDetail addJobDetail(TriggerKey tk, String pJobName) + throws SchedulerException, IOException { Scheduler scheduler = factory.getScheduler(); JobKey jobKey = jobKey(tk.getName(), tk.getGroup()); JobDetail jobDetail; @@ -335,7 +366,8 @@ public class JobInstance implements Job { return jobDetail; } - private void setJobDataMap(JobDetail jobDetail, String pJobName) throws IOException { + private void setJobDataMap(JobDetail jobDetail, String pJobName) + throws IOException { JobDataMap dataMap = jobDetail.getJobDataMap(); preProcessMeasure(); String result = toJson(measure); http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/JobOperator.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobOperator.java b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java index 7fdbe7d..81c3b17 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobOperator.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java @@ -26,7 +26,8 @@ import org.apache.griffin.core.measure.entity.GriffinMeasure; import org.quartz.SchedulerException; public interface JobOperator { - AbstractJob add(AbstractJob job, GriffinMeasure measure) throws Exception; + AbstractJob add(AbstractJob job, GriffinMeasure measure) + throws Exception; void start(AbstractJob job) throws Exception; @@ -34,7 +35,9 @@ public interface JobOperator { void delete(AbstractJob job) throws SchedulerException; - JobHealth getHealth(JobHealth jobHealth, AbstractJob job) throws SchedulerException; + JobHealth getHealth(JobHealth jobHealth, AbstractJob job) + throws SchedulerException; - JobState getState(AbstractJob job, String action) throws SchedulerException; + JobState getState(AbstractJob job, String action) + throws SchedulerException; } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java index 5617065..ac14461 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java @@ -101,7 +101,8 @@ import org.springframework.web.client.RestTemplate; @Service public class JobServiceImpl implements JobService { - private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceImpl.class); + private static final Logger LOGGER = LoggerFactory + .getLogger(JobServiceImpl.class); public static final String GRIFFIN_JOB_ID = "griffinJobId"; private static final int MAX_PAGE_SIZE = 1024; private static final int DEFAULT_PAGE_SIZE = 10; @@ -156,7 +157,8 @@ public class JobServiceImpl implements JobService { } } catch (SchedulerException e) { LOGGER.error("Failed to get RUNNING jobs.", e); - throw new GriffinException.ServiceException("Failed to get RUNNING jobs.", e); + throw new GriffinException + .ServiceException("Failed to get RUNNING jobs.", e); } return dataList; } @@ -174,7 +176,8 @@ public class JobServiceImpl implements JobService { AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false); if (job == null) { LOGGER.warn("Job id {} does not exist.", jobId); - throw new GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST); + throw new GriffinException + .NotFoundException(JOB_ID_DOES_NOT_EXIST); } return job; } @@ -194,7 +197,8 @@ public class JobServiceImpl implements JobService { return job; } - private void doAction(String action, AbstractJob job, JobOperator op) throws Exception { + private void doAction(String action, AbstractJob job, JobOperator op) + throws Exception { switch (action) { case START: op.start(job); @@ -203,7 +207,8 @@ public class JobServiceImpl implements JobService { op.stop(job); break; default: - throw new GriffinException.NotFoundException(NO_SUCH_JOB_ACTION); + throw new GriffinException + .NotFoundException(NO_SUCH_JOB_ACTION); } } @@ -233,7 +238,8 @@ public class JobServiceImpl implements JobService { List<AbstractJob> jobs = jobRepo.findByJobNameAndDeleted(name, false); if (CollectionUtils.isEmpty(jobs)) { LOGGER.warn("There is no job with '{}' name.", name); - throw new GriffinException.NotFoundException(JOB_NAME_DOES_NOT_EXIST); + throw new GriffinException + .NotFoundException(JOB_NAME_DOES_NOT_EXIST); } for (AbstractJob job : jobs) { JobOperator op = getJobOperator(job); @@ -242,16 +248,22 @@ public class JobServiceImpl implements JobService { } @Override - public List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int size) { + public List<JobInstanceBean> findInstancesOfJob( + Long jobId, + int page, + int size) { AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false); if (job == null) { LOGGER.warn("Job id {} does not exist.", jobId); - throw new GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST); + throw new GriffinException + .NotFoundException(JOB_ID_DOES_NOT_EXIST); } size = size > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : size; size = size <= 0 ? DEFAULT_PAGE_SIZE : size; - Pageable pageable = new PageRequest(page, size, Sort.Direction.DESC, "tms"); - List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, pageable); + Pageable pageable = new PageRequest(page, size, + Sort.Direction.DESC, "tms"); + List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, + pageable); return updateState(instances); } @@ -266,7 +278,8 @@ public class JobServiceImpl implements JobService { } /** - * a job is regard as healthy job when its latest instance is in healthy state. + * a job is regard as healthy job when its latest instance is in healthy + * state. * * @return job healthy statistics */ @@ -280,7 +293,8 @@ public class JobServiceImpl implements JobService { jobHealth = op.getHealth(jobHealth, job); } catch (SchedulerException e) { LOGGER.error("Job schedule exception. {}", e); - throw new GriffinException.ServiceException("Fail to Get HealthInfo", e); + throw new GriffinException + .ServiceException("Fail to Get HealthInfo", e); } } @@ -290,7 +304,9 @@ public class JobServiceImpl implements JobService { @Scheduled(fixedDelayString = "${jobInstance.expired.milliseconds}") public void deleteExpiredJobInstance() { Long timeMills = System.currentTimeMillis(); - List<JobInstanceBean> instances = instanceRepo.findByExpireTmsLessThanEqual(timeMills); + List<JobInstanceBean> instances = instanceRepo + .findByExpireTmsLessThanEqual + (timeMills); if (!batchJobOp.pauseJobInstances(instances)) { LOGGER.error("Pause job failure."); return; @@ -312,7 +328,8 @@ public class JobServiceImpl implements JobService { } else if (job instanceof StreamingJob) { return streamingJobOp; } - throw new GriffinException.BadRequestException(JOB_TYPE_DOES_NOT_SUPPORT); + throw new GriffinException.BadRequestException + (JOB_TYPE_DOES_NOT_SUPPORT); } private JobOperator getJobOperator(ProcessType type) { @@ -321,18 +338,22 @@ public class JobServiceImpl implements JobService { } else if (type == STREAMING) { return streamingJobOp; } - throw new GriffinException.BadRequestException(MEASURE_TYPE_DOES_NOT_SUPPORT); + throw new GriffinException.BadRequestException + (MEASURE_TYPE_DOES_NOT_SUPPORT); } - TriggerKey getTriggerKeyIfValid(String qName, String qGroup) throws SchedulerException { + TriggerKey getTriggerKeyIfValid(String qName, String qGroup) throws + SchedulerException { TriggerKey triggerKey = triggerKey(qName, qGroup); if (factory.getScheduler().checkExists(triggerKey)) { - throw new GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST); + throw new GriffinException.ConflictException + (QUARTZ_JOB_ALREADY_EXIST); } return triggerKey; } - List<? extends Trigger> getTriggers(String name, String group) throws SchedulerException { + List<? extends Trigger> getTriggers(String name, String group) throws + SchedulerException { if (name == null || group == null) { return null; } @@ -341,7 +362,8 @@ public class JobServiceImpl implements JobService { return scheduler.getTriggersOfJob(jobKey); } - private JobState genJobState(AbstractJob job, String action) throws SchedulerException { + private JobState genJobState(AbstractJob job, String action) throws + SchedulerException { JobOperator op = getJobOperator(job); JobState state = op.getState(job, action); job.setJobState(state); @@ -352,7 +374,8 @@ public class JobServiceImpl implements JobService { return genJobState(job, null); } - void addJob(TriggerKey tk, AbstractJob job, ProcessType type) throws Exception { + void addJob(TriggerKey tk, AbstractJob job, ProcessType type) throws + Exception { JobDetail jobDetail = addJobDetail(tk, job); Trigger trigger = genTriggerInstance(tk, jobDetail, job, type); factory.getScheduler().scheduleJob(trigger); @@ -381,27 +404,35 @@ public class JobServiceImpl implements JobService { private GriffinMeasure getMeasureIfValid(Long measureId) { - GriffinMeasure measure = measureRepo.findByIdAndDeleted(measureId, false); + GriffinMeasure measure = measureRepo.findByIdAndDeleted(measureId, + false); if (measure == null) { - LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't exist or is external measure type.", measureId); + LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't " + + "exist or is external measure type.", + measureId); throw new GriffinException.BadRequestException(INVALID_MEASURE_ID); } return measure; } - private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, AbstractJob job, ProcessType type) { + private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, AbstractJob + job, ProcessType type) { TriggerBuilder builder = newTrigger().withIdentity(tk).forJob(jd); if (type == BATCH) { TimeZone timeZone = getTimeZone(job.getTimeZone()); - return builder.withSchedule(cronSchedule(job.getCronExpression()).inTimeZone(timeZone)).build(); + return builder.withSchedule(cronSchedule(job.getCronExpression()) + .inTimeZone(timeZone)).build(); } else if (type == STREAMING) { - return builder.startNow().withSchedule(simpleSchedule().withRepeatCount(0)).build(); + return builder.startNow().withSchedule(simpleSchedule() + .withRepeatCount(0)).build(); } - throw new GriffinException.BadRequestException(JOB_TYPE_DOES_NOT_SUPPORT); + throw new GriffinException.BadRequestException + (JOB_TYPE_DOES_NOT_SUPPORT); } - private JobDetail addJobDetail(TriggerKey triggerKey, AbstractJob job) throws SchedulerException { + private JobDetail addJobDetail(TriggerKey triggerKey, AbstractJob job) + throws SchedulerException { Scheduler scheduler = factory.getScheduler(); JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup()); JobDetail jobDetail; @@ -409,7 +440,8 @@ public class JobServiceImpl implements JobService { if (isJobKeyExist) { jobDetail = scheduler.getJobDetail(jobKey); } else { - jobDetail = newJob(JobInstance.class).storeDurably().withIdentity(jobKey).build(); + jobDetail = newJob(JobInstance.class).storeDurably().withIdentity + (jobKey).build(); } setJobDataMap(jobDetail, job); scheduler.addJob(jobDetail, isJobKeyExist); @@ -429,8 +461,10 @@ public class JobServiceImpl implements JobService { * * @param measureId measure id */ - public void deleteJobsRelateToMeasure(Long measureId) throws SchedulerException { - List<AbstractJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId, false); + public void deleteJobsRelateToMeasure(Long measureId) throws + SchedulerException { + List<AbstractJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId, + false); if (CollectionUtils.isEmpty(jobs)) { LOGGER.info("Measure id {} has no related jobs.", measureId); return; @@ -443,7 +477,8 @@ public class JobServiceImpl implements JobService { @Scheduled(fixedDelayString = "${jobInstance.fixedDelay.in.milliseconds}") public void syncInstancesOfAllJobs() { - LivySessionStates.State[] states = {STARTING, NOT_STARTED, RECOVERING, IDLE, RUNNING, BUSY}; + LivySessionStates.State[] states = {STARTING, NOT_STARTED, RECOVERING, + IDLE, RUNNING, BUSY}; List<JobInstanceBean> beans = instanceRepo.findByActiveState(states); for (JobInstanceBean jobInstance : beans) { syncInstancesOfJob(jobInstance); @@ -451,7 +486,8 @@ public class JobServiceImpl implements JobService { } /** - * call livy to update part of job instance table data associated with group and jobName in mysql. + * call livy to update part of job instance table data associated with group + * and jobName in mysql. * * @param instance job instance livy info */ @@ -459,17 +495,22 @@ public class JobServiceImpl implements JobService { if (instance.getSessionId() == null) { return; } - String uri = env.getProperty("livy.uri") + "/" + instance.getSessionId(); - TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() { - }; + String uri = env.getProperty("livy.uri") + "/" + + instance.getSessionId(); + TypeReference<HashMap<String, Object>> type = + new TypeReference<HashMap<String, Object>>() { + }; try { String resultStr = restTemplate.getForObject(uri, String.class); - HashMap<String, Object> resultMap = JsonUtil.toEntity(resultStr, type); + HashMap<String, Object> resultMap = JsonUtil.toEntity(resultStr, + type); setJobInstanceIdAndUri(instance, resultMap); } catch (ResourceAccessException e) { - LOGGER.error("Your url may be wrong. Please check {}.\n {}", uri, e.getMessage()); + LOGGER.error("Your url may be wrong. Please check {}.\n {}", uri, e + .getMessage()); } catch (HttpClientErrorException e) { - LOGGER.warn("sessionId({}) appId({}) {}.", instance.getSessionId(), instance.getAppId(), e.getMessage()); + LOGGER.warn("sessionId({}) appId({}) {}.", instance.getSessionId(), + instance.getAppId(), e.getMessage()); setStateByYarn(instance, e); } catch (Exception e) { LOGGER.error(e.getMessage()); @@ -477,11 +518,14 @@ public class JobServiceImpl implements JobService { } - private void setStateByYarn(JobInstanceBean instance, HttpClientErrorException e) { + private void setStateByYarn(JobInstanceBean instance, + HttpClientErrorException e) { if (!checkStatus(instance, e)) { int code = e.getStatusCode().value(); - boolean match = (code == 400 || code == 404) && instance.getAppId() != null; - //this means your url is correct,but your param is wrong or livy session may be overdue. + boolean match = (code == 400 || code == 404) + && instance.getAppId() != null; + //this means your url is correct,but your param is wrong or livy + //session may be overdue. if (match) { setStateByYarn(instance); } @@ -490,21 +534,26 @@ public class JobServiceImpl implements JobService { } /** - * Check instance status in case that session id is overdue and app id is null and so we cannot update instance state. + * Check instance status in case that session id is overdue and app id is + * null and so we cannot update instance state + * . * * @param instance job instance bean * @param e HttpClientErrorException * @return boolean */ - private boolean checkStatus(JobInstanceBean instance, HttpClientErrorException e) { + private boolean checkStatus(JobInstanceBean instance, + HttpClientErrorException e) { int code = e.getStatusCode().value(); String appId = instance.getAppId(); String responseBody = e.getResponseBodyAsString(); Long sessionId = instance.getSessionId(); sessionId = sessionId != null ? sessionId : -1; - // If code is 404 and appId is null and response body is like 'Session {id} not found', - // this means instance may not be scheduled for a long time by spark for too many tasks. It may be dead. - if (code == 404 && appId == null && (responseBody != null && responseBody.contains(sessionId.toString()))) { + // If code is 404 and appId is null and response body is like 'Session + // {id} not found',this means instance may not be scheduled for + // a long time by spark for too many tasks. It may be dead. + if (code == 404 && appId == null && (responseBody != null && + responseBody.contains(sessionId.toString()))) { instance.setState(DEAD); instance.setDeleted(true); instanceRepo.save(instance); @@ -514,7 +563,8 @@ public class JobServiceImpl implements JobService { } private void setStateByYarn(JobInstanceBean instance) { - LOGGER.warn("Spark session {} may be overdue! Now we use yarn to update state.", instance.getSessionId()); + LOGGER.warn("Spark session {} may be overdue! " + + "Now we use yarn to update state.", instance.getSessionId()); String yarnUrl = env.getProperty("yarn.uri"); boolean success = YarnNetUtil.update(yarnUrl, instance); if (!success) { @@ -527,34 +577,43 @@ public class JobServiceImpl implements JobService { } - private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String, Object> resultMap) { + private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String + , Object> resultMap) { if (resultMap != null) { Object state = resultMap.get("state"); Object appId = resultMap.get("appId"); - instance.setState(state == null ? null : LivySessionStates.State.valueOf(state.toString().toUpperCase())); + instance.setState(state == null ? null : LivySessionStates.State + .valueOf(state.toString().toUpperCase + ())); instance.setAppId(appId == null ? null : appId.toString()); - instance.setAppUri(appId == null ? null : env.getProperty("yarn.uri") + "/cluster/app/" + appId); + instance.setAppUri(appId == null ? null : env + .getProperty("yarn.uri") + " /cluster/app/ " + appId); instanceRepo.save(instance); } } public Boolean isJobHealthy(Long jobId) { Pageable pageable = new PageRequest(0, 1, Sort.Direction.DESC, "tms"); - List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, pageable); - return !CollectionUtils.isEmpty(instances) && LivySessionStates.isHealthy(instances.get(0).getState()); + List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, + pageable); + return !CollectionUtils.isEmpty(instances) && LivySessionStates + .isHealthy(instances.get(0).getState()); } @Override public String getJobHdfsSinksPath(String jobName, long timestamp) { - List<AbstractJob> jobList = jobRepo.findByJobNameAndDeleted(jobName, false); + List<AbstractJob> jobList = jobRepo.findByJobNameAndDeleted( + jobName, false); if (jobList.size() == 0) { return null; } if (jobList.get(0).getType().toLowerCase().equals("batch")) { - return getSinksPath(ENV_BATCH) + "/" + jobName + "/" + timestamp + ""; + return getSinksPath(ENV_BATCH) + + "/" + jobName + "/" + timestamp + ""; } - return getSinksPath(ENV_STREAMING) + "/" + jobName + "/" + timestamp + ""; + return getSinksPath(ENV_STREAMING) + + "/" + jobName + "/" + timestamp + ""; } private String getSinksPath(String jsonString) { @@ -563,8 +622,10 @@ public class JobServiceImpl implements JobService { JSONArray persistArray = obj.getJSONArray("sinks"); for (int i = 0; i < persistArray.length(); i++) { Object type = persistArray.getJSONObject(i).get("type"); - if (type instanceof String && "hdfs".equalsIgnoreCase(String.valueOf(type))) { - return persistArray.getJSONObject(i).getJSONObject("config").getString("path"); + if (type instanceof String + && "hdfs".equalsIgnoreCase(String.valueOf(type))) { + return persistArray.getJSONObject(i) + .getJSONObject("config").getString("path"); } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/3bbbcb32/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java index 19e746f..64478b8 100644 --- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java +++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java @@ -68,7 +68,8 @@ import org.springframework.web.client.RestTemplate; @DisallowConcurrentExecution @Component public class SparkSubmitJob implements Job { - private static final Logger LOGGER = LoggerFactory.getLogger(SparkSubmitJob.class); + private static final Logger LOGGER = + LoggerFactory.getLogger(SparkSubmitJob.class); @Autowired private JobInstanceRepo jobInstanceRepo; @@ -99,7 +100,8 @@ public class SparkSubmitJob implements Job { } } - private void updateJobInstanceState(JobExecutionContext context) throws IOException { + private void updateJobInstanceState(JobExecutionContext context) throws + IOException { SimpleTrigger simpleTrigger = (SimpleTrigger) context.getTrigger(); int repeatCount = simpleTrigger.getRepeatCount(); int fireCount = simpleTrigger.getTimesTriggered(); @@ -111,10 +113,13 @@ public class SparkSubmitJob implements Job { private String post2Livy() { String result = null; try { - result = restTemplate.postForObject(livyUri, livyConfMap, String.class); + result = restTemplate.postForObject(livyUri, livyConfMap, + String.class); LOGGER.info(result); } catch (HttpClientErrorException e) { - LOGGER.error("Post to livy ERROR. \n {} {}", e.getMessage(), e.getResponseBodyAsString()); + LOGGER.error("Post to livy ERROR. \n {} {}", + e.getMessage(), + e.getResponseBodyAsString()); } catch (Exception e) { LOGGER.error("Post to livy ERROR. {}", e.getMessage()); } @@ -126,7 +131,8 @@ public class SparkSubmitJob implements Job { return true; } for (SegmentPredicate segPredicate : predicates) { - Predicator predicator = PredicatorFactory.newPredicateInstance(segPredicate); + Predicator predicator = PredicatorFactory + .newPredicateInstance(segPredicate); try { if (predicator != null && !predicator.predicate()) { return false; @@ -141,11 +147,14 @@ public class SparkSubmitJob implements Job { private void initParam(JobDetail jd) throws IOException { mPredicates = new ArrayList<>(); - jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap().getString(PREDICATE_JOB_NAME)); - measure = toEntity(jd.getJobDataMap().getString(MEASURE_KEY), GriffinMeasure.class); + jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap() + .getString(PREDICATE_JOB_NAME)); + measure = toEntity(jd.getJobDataMap().getString(MEASURE_KEY), + GriffinMeasure.class); livyUri = env.getProperty("livy.uri"); setPredicates(jd.getJobDataMap().getString(PREDICATES_KEY)); - // in order to keep metric name unique, we set job name as measure name at present + // in order to keep metric name unique, we set job name + // as measure name at present measure.setName(jd.getJobDataMap().getString(JOB_NAME)); } @@ -154,8 +163,9 @@ public class SparkSubmitJob implements Job { if (StringUtils.isEmpty(json)) { return; } - List<Map<String, Object>> maps = toEntity(json, new TypeReference<List<Map>>() { - }); + List<Map<String, Object>> maps = toEntity(json, + new TypeReference<List<Map>>() { + }); for (Map<String, Object> map : maps) { SegmentPredicate sp = new SegmentPredicate(); sp.setType((String) map.get("type")); @@ -195,8 +205,10 @@ public class SparkSubmitJob implements Job { } - private void saveJobInstance(JobDetail jd) throws SchedulerException, IOException { - // If result is null, it may livy uri is wrong or livy parameter is wrong. + private void saveJobInstance(JobDetail jd) throws SchedulerException, + IOException { + // If result is null, it may livy uri is wrong + // or livy parameter is wrong. String result = post2Livy(); String group = jd.getKey().getGroup(); String name = jd.getKey().getName(); @@ -205,9 +217,11 @@ public class SparkSubmitJob implements Job { saveJobInstance(result, FOUND); } - private void saveJobInstance(String result, State state) throws IOException { - TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() { - }; + private void saveJobInstance(String result, State state) + throws IOException { + TypeReference<HashMap<String, Object>> type = + new TypeReference<HashMap<String, Object>>() { + }; Map<String, Object> resultMap = null; if (result != null) { resultMap = toEntity(result, type); @@ -223,8 +237,10 @@ public class SparkSubmitJob implements Job { Object status = resultMap.get("state"); Object id = resultMap.get("id"); Object appId = resultMap.get("appId"); - jobInstance.setState(status == null ? null : State.valueOf(status.toString().toUpperCase())); - jobInstance.setSessionId(id == null ? null : Long.parseLong(id.toString())); + jobInstance.setState(status == null ? null : State.valueOf(status + .toString().toUpperCase())); + jobInstance.setSessionId(id == null ? null : Long.parseLong(id + .toString())); jobInstance.setAppId(appId == null ? null : appId.toString()); } }
