Repository: incubator-griffin Updated Branches: refs/heads/master 165ef3ded -> f1781ab8b
Define griffin plain-vanilla hook. the purpose of hook is for integration with components outside. Griffin would offer information about internal task status. Task: GRIFFIN-200 Author: Eugene <liu...@apache.org> Author: William Guo <gu...@apache.org> Closes #444 from toyboxman/hook. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/f1781ab8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/f1781ab8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/f1781ab8 Branch: refs/heads/master Commit: f1781ab8bc06426425a3f9f50f612f7f5ad03379 Parents: 165ef3d Author: Eugene <liu...@apache.org> Authored: Fri Nov 9 22:46:40 2018 +0800 Committer: William Guo <gu...@apache.org> Committed: Fri Nov 9 22:46:40 2018 +0800 ---------------------------------------------------------------------- .../griffin/core/event/EventPointcutType.java | 26 +++ .../griffin/core/event/EventSourceType.java | 25 +++ .../apache/griffin/core/event/EventType.java | 26 +++ .../core/event/GriffinAbstractEvent.java | 57 ++++++ .../apache/griffin/core/event/GriffinEvent.java | 54 ++++++ .../griffin/core/event/GriffinEventManager.java | 59 ++++++ .../apache/griffin/core/event/GriffinHook.java | 42 +++++ .../org/apache/griffin/core/event/JobEvent.java | 60 ++++++ .../apache/griffin/core/event/JobEventHook.java | 31 ++++ .../core/exception/GriffinException.java | 5 + .../apache/griffin/core/job/JobServiceImpl.java | 183 ++++++++++--------- .../src/main/resources/application.properties | 2 + .../griffin/core/job/EventServiceTest.java | 107 +++++++++++ .../src/test/resources/application.properties | 8 +- 14 files changed, 599 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f1781ab8/service/src/main/java/org/apache/griffin/core/event/EventPointcutType.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/event/EventPointcutType.java b/service/src/main/java/org/apache/griffin/core/event/EventPointcutType.java new file mode 100644 index 0000000..166372d --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/event/EventPointcutType.java @@ -0,0 +1,26 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.event; + +public enum EventPointcutType { + BEFORE, + PENDING, + AFTER +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f1781ab8/service/src/main/java/org/apache/griffin/core/event/EventSourceType.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/event/EventSourceType.java b/service/src/main/java/org/apache/griffin/core/event/EventSourceType.java new file mode 100644 index 0000000..6bca570 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/event/EventSourceType.java @@ -0,0 +1,25 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.event; + +public enum EventSourceType { + JOB, + MEASURE +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f1781ab8/service/src/main/java/org/apache/griffin/core/event/EventType.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/event/EventType.java b/service/src/main/java/org/apache/griffin/core/event/EventType.java new file mode 100644 index 0000000..b00a9b0 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/event/EventType.java @@ -0,0 +1,26 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.event; + +public enum EventType { + CREATION_EVENT, + CHANGE_EVENT, + REMOVAL_EVENT +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f1781ab8/service/src/main/java/org/apache/griffin/core/event/GriffinAbstractEvent.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/event/GriffinAbstractEvent.java b/service/src/main/java/org/apache/griffin/core/event/GriffinAbstractEvent.java new file mode 100644 index 0000000..db29e44 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/event/GriffinAbstractEvent.java @@ -0,0 +1,57 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.event; + +public abstract class GriffinAbstractEvent<T> implements GriffinEvent<T> { + private T source; + private EventType type; + private EventSourceType sourceType; + private EventPointcutType pointcutType; + + public GriffinAbstractEvent(T source, + EventType type, + EventSourceType sourceType, + EventPointcutType pointcutType) { + this.source = source; + this.type = type; + this.sourceType = sourceType; + this.pointcutType = pointcutType; + } + + @Override + public EventType getType() { + return this.type; + } + + @Override + public EventPointcutType getPointcut() { + return pointcutType; + } + + @Override + public EventSourceType getSourceType() { + return sourceType; + } + + @Override + public T getSource() { + return source; + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f1781ab8/service/src/main/java/org/apache/griffin/core/event/GriffinEvent.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/event/GriffinEvent.java b/service/src/main/java/org/apache/griffin/core/event/GriffinEvent.java new file mode 100644 index 0000000..cff6163 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/event/GriffinEvent.java @@ -0,0 +1,54 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.event; + +/** + * A semantic event which indicates that a griffin-defined action occurred. + * This high-level event is generated by an action (such as an + * <code>addJob</code>) when the task-specific action occurs. + * The event is passed to every <code>GriffinHook</code> object + * that registered to receive such events using configuration. + * + * @author Eugene Liu + * @since 0.3 + */ +public interface GriffinEvent<T> { + /** + * @return concrete event type + */ + EventType getType(); + + /** + * @return concrete event pointcut type + */ + EventPointcutType getPointcut(); + + /** + * @return concrete event source type + */ + EventSourceType getSourceType(); + + /** + * The object on which the Event initially occurred. + * + * @return The object on which the Event initially occurred. + */ + T getSource(); +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f1781ab8/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java b/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java new file mode 100644 index 0000000..996d7a7 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/event/GriffinEventManager.java @@ -0,0 +1,59 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.event; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +@Component +public class GriffinEventManager { + @Autowired + private ApplicationContext applicationContext; + + @Value("#{'${internal.event.listeners}'.split(',')}") + private Set<String> enabledListeners; + + private List<GriffinHook> eventListeners; + + @PostConstruct + void initializeListeners() { + List<GriffinHook> eventListeners = new ArrayList<>(); + applicationContext.getBeansOfType(GriffinHook.class) + .forEach((beanName, listener) -> { + if (enabledListeners.contains(beanName)) { + eventListeners.add(listener); + } + }); + this.eventListeners = eventListeners; + } + + public void notifyListeners(GriffinEvent event) { + eventListeners.forEach(listener -> { + listener.onEvent(event); + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f1781ab8/service/src/main/java/org/apache/griffin/core/event/GriffinHook.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/event/GriffinHook.java b/service/src/main/java/org/apache/griffin/core/event/GriffinHook.java new file mode 100644 index 0000000..5090648 --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/event/GriffinHook.java @@ -0,0 +1,42 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.event; + +import org.apache.griffin.core.exception.GriffinException; + +/** + * The Hook interface for receiving internal events. + * The class that is interested in processing an event + * implements this interface, and the object created with that + * class is registered to griffin, using the configuration. + * When the event occurs, that object's <code>onEvent</code> method is + * invoked. + * + * @author Eugene Liu + * @since 0.3 + */ +public interface GriffinHook { + /** + * Invoked when an action occurs. + * + * @see GriffinEvent + */ + void onEvent(GriffinEvent event) throws GriffinException; +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f1781ab8/service/src/main/java/org/apache/griffin/core/event/JobEvent.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/event/JobEvent.java b/service/src/main/java/org/apache/griffin/core/event/JobEvent.java new file mode 100644 index 0000000..5fdea0f --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/event/JobEvent.java @@ -0,0 +1,60 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.event; + +import org.apache.griffin.core.job.entity.AbstractJob; + +public class JobEvent extends GriffinAbstractEvent<AbstractJob> { + + private JobEvent(AbstractJob source, + EventType type, + EventSourceType sourceType, + EventPointcutType pointcutType) { + super(source, type, sourceType, pointcutType); + } + + public static JobEvent yieldJobEventBeforeCreation(AbstractJob source) { + return new JobEvent(source, + EventType.CREATION_EVENT, + EventSourceType.JOB, + EventPointcutType.BEFORE); + } + + public static JobEvent yieldJobEventAfterCreation(AbstractJob source) { + return new JobEvent(source, + EventType.CREATION_EVENT, + EventSourceType.JOB, + EventPointcutType.AFTER); + } + + public static JobEvent yieldJobEventBeforeRemoval(AbstractJob source) { + return new JobEvent(source, + EventType.REMOVAL_EVENT, + EventSourceType.JOB, + EventPointcutType.BEFORE); + } + + public static JobEvent yieldJobEventAfterRemoval(AbstractJob source) { + return new JobEvent(source, + EventType.REMOVAL_EVENT, + EventSourceType.JOB, + EventPointcutType.AFTER); + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f1781ab8/service/src/main/java/org/apache/griffin/core/event/JobEventHook.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/event/JobEventHook.java b/service/src/main/java/org/apache/griffin/core/event/JobEventHook.java new file mode 100644 index 0000000..5c95f3e --- /dev/null +++ b/service/src/main/java/org/apache/griffin/core/event/JobEventHook.java @@ -0,0 +1,31 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.event; + +import org.apache.griffin.core.exception.GriffinException; +import org.springframework.context.annotation.Configuration; + +@Configuration(value = "GriffinJobEventHook") +public class JobEventHook implements GriffinHook { + @Override + public void onEvent(GriffinEvent event) throws GriffinException { + // This method needs to be reimplemented by event-consuming purpose + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f1781ab8/service/src/main/java/org/apache/griffin/core/exception/GriffinException.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinException.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinException.java index db532ab..f1cdaba 100644 --- a/service/src/main/java/org/apache/griffin/core/exception/GriffinException.java +++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinException.java @@ -60,4 +60,9 @@ public abstract class GriffinException extends RuntimeException { } } + public static class UnImplementedException extends GriffinException { + public UnImplementedException(String message) { + super(message); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f1781ab8/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java index ac14461..4768efc 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java @@ -19,43 +19,12 @@ under the License. package org.apache.griffin.core.job; -import static java.util.TimeZone.getTimeZone; -import static org.apache.griffin.core.config.EnvConfig.ENV_BATCH; -import static org.apache.griffin.core.config.EnvConfig.ENV_STREAMING; -import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_MEASURE_ID; -import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_ID_DOES_NOT_EXIST; -import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_NAME_DOES_NOT_EXIST; -import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_TYPE_DOES_NOT_SUPPORT; -import static org.apache.griffin.core.exception.GriffinExceptionMessage.MEASURE_TYPE_DOES_NOT_SUPPORT; -import static org.apache.griffin.core.exception.GriffinExceptionMessage.NO_SUCH_JOB_ACTION; -import static org.apache.griffin.core.exception.GriffinExceptionMessage.QUARTZ_JOB_ALREADY_EXIST; -import static org.apache.griffin.core.job.entity.LivySessionStates.State.BUSY; -import static org.apache.griffin.core.job.entity.LivySessionStates.State.DEAD; -import static org.apache.griffin.core.job.entity.LivySessionStates.State.IDLE; -import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_STARTED; -import static org.apache.griffin.core.job.entity.LivySessionStates.State.RECOVERING; -import static org.apache.griffin.core.job.entity.LivySessionStates.State.RUNNING; -import static org.apache.griffin.core.job.entity.LivySessionStates.State.STARTING; -import static org.apache.griffin.core.job.entity.LivySessionStates.State.UNKNOWN; -import static org.apache.griffin.core.job.entity.LivySessionStates.isActive; -import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH; -import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.STREAMING; -import static org.quartz.CronScheduleBuilder.cronSchedule; -import static org.quartz.JobBuilder.newJob; -import static org.quartz.JobKey.jobKey; -import static org.quartz.SimpleScheduleBuilder.simpleSchedule; -import static org.quartz.TriggerBuilder.newTrigger; -import static org.quartz.TriggerKey.triggerKey; - import com.fasterxml.jackson.core.type.TypeReference; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.TimeZone; - import org.apache.commons.lang.StringUtils; +import org.apache.griffin.core.event.GriffinEventManager; import org.apache.griffin.core.exception.GriffinException; +import org.apache.griffin.core.event.JobEvent; import org.apache.griffin.core.job.entity.AbstractJob; import org.apache.griffin.core.job.entity.BatchJob; import org.apache.griffin.core.job.entity.JobHealth; @@ -99,10 +68,43 @@ import org.springframework.web.client.HttpClientErrorException; import org.springframework.web.client.ResourceAccessException; import org.springframework.web.client.RestTemplate; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.TimeZone; + +import static java.util.TimeZone.getTimeZone; +import static org.apache.griffin.core.config.EnvConfig.ENV_BATCH; +import static org.apache.griffin.core.config.EnvConfig.ENV_STREAMING; +import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_MEASURE_ID; +import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_ID_DOES_NOT_EXIST; +import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_NAME_DOES_NOT_EXIST; +import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_TYPE_DOES_NOT_SUPPORT; +import static org.apache.griffin.core.exception.GriffinExceptionMessage.MEASURE_TYPE_DOES_NOT_SUPPORT; +import static org.apache.griffin.core.exception.GriffinExceptionMessage.NO_SUCH_JOB_ACTION; +import static org.apache.griffin.core.exception.GriffinExceptionMessage.QUARTZ_JOB_ALREADY_EXIST; +import static org.apache.griffin.core.job.entity.LivySessionStates.State.BUSY; +import static org.apache.griffin.core.job.entity.LivySessionStates.State.DEAD; +import static org.apache.griffin.core.job.entity.LivySessionStates.State.IDLE; +import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_STARTED; +import static org.apache.griffin.core.job.entity.LivySessionStates.State.RECOVERING; +import static org.apache.griffin.core.job.entity.LivySessionStates.State.RUNNING; +import static org.apache.griffin.core.job.entity.LivySessionStates.State.STARTING; +import static org.apache.griffin.core.job.entity.LivySessionStates.State.UNKNOWN; +import static org.apache.griffin.core.job.entity.LivySessionStates.isActive; +import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH; +import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.STREAMING; +import static org.quartz.CronScheduleBuilder.cronSchedule; +import static org.quartz.JobBuilder.newJob; +import static org.quartz.JobKey.jobKey; +import static org.quartz.SimpleScheduleBuilder.simpleSchedule; +import static org.quartz.TriggerBuilder.newTrigger; +import static org.quartz.TriggerKey.triggerKey; + @Service public class JobServiceImpl implements JobService { private static final Logger LOGGER = LoggerFactory - .getLogger(JobServiceImpl.class); + .getLogger(JobServiceImpl.class); public static final String GRIFFIN_JOB_ID = "griffinJobId"; private static final int MAX_PAGE_SIZE = 1024; private static final int DEFAULT_PAGE_SIZE = 10; @@ -127,6 +129,8 @@ public class JobServiceImpl implements JobService { private BatchJobOperatorImpl batchJobOp; @Autowired private StreamingJobOperatorImpl streamingJobOp; + @Autowired + private GriffinEventManager eventManager; private RestTemplate restTemplate; @@ -158,17 +162,22 @@ public class JobServiceImpl implements JobService { } catch (SchedulerException e) { LOGGER.error("Failed to get RUNNING jobs.", e); throw new GriffinException - .ServiceException("Failed to get RUNNING jobs.", e); + .ServiceException("Failed to get RUNNING jobs.", e); } return dataList; } @Override public AbstractJob addJob(AbstractJob job) throws Exception { + JobEvent jobEvent = JobEvent.yieldJobEventBeforeCreation(job); + eventManager.notifyListeners(jobEvent); Long measureId = job.getMeasureId(); GriffinMeasure measure = getMeasureIfValid(measureId); JobOperator op = getJobOperator(measure.getProcessType()); - return op.add(job, measure); + AbstractJob jobSaved = op.add(job, measure); + jobEvent = JobEvent.yieldJobEventAfterCreation(jobSaved); + eventManager.notifyListeners(jobEvent); + return jobSaved; } @Override @@ -177,7 +186,7 @@ public class JobServiceImpl implements JobService { if (job == null) { LOGGER.warn("Job id {} does not exist.", jobId); throw new GriffinException - .NotFoundException(JOB_ID_DOES_NOT_EXIST); + .NotFoundException(JOB_ID_DOES_NOT_EXIST); } return job; } @@ -198,7 +207,7 @@ public class JobServiceImpl implements JobService { } private void doAction(String action, AbstractJob job, JobOperator op) - throws Exception { + throws Exception { switch (action) { case START: op.start(job); @@ -208,7 +217,7 @@ public class JobServiceImpl implements JobService { break; default: throw new GriffinException - .NotFoundException(NO_SUCH_JOB_ACTION); + .NotFoundException(NO_SUCH_JOB_ACTION); } } @@ -224,8 +233,12 @@ public class JobServiceImpl implements JobService { public void deleteJob(Long jobId) throws SchedulerException { AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false); validateJobExist(job); + JobEvent event = JobEvent.yieldJobEventBeforeRemoval(job); + eventManager.notifyListeners(event); JobOperator op = getJobOperator(job); op.delete(job); + event = JobEvent.yieldJobEventAfterRemoval(job); + eventManager.notifyListeners(event); } /** @@ -239,31 +252,35 @@ public class JobServiceImpl implements JobService { if (CollectionUtils.isEmpty(jobs)) { LOGGER.warn("There is no job with '{}' name.", name); throw new GriffinException - .NotFoundException(JOB_NAME_DOES_NOT_EXIST); + .NotFoundException(JOB_NAME_DOES_NOT_EXIST); } for (AbstractJob job : jobs) { + JobEvent event = JobEvent.yieldJobEventBeforeRemoval(job); + eventManager.notifyListeners(event); JobOperator op = getJobOperator(job); op.delete(job); + event = JobEvent.yieldJobEventAfterRemoval(job); + eventManager.notifyListeners(event); } } @Override public List<JobInstanceBean> findInstancesOfJob( - Long jobId, - int page, - int size) { + Long jobId, + int page, + int size) { AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false); if (job == null) { LOGGER.warn("Job id {} does not exist.", jobId); throw new GriffinException - .NotFoundException(JOB_ID_DOES_NOT_EXIST); + .NotFoundException(JOB_ID_DOES_NOT_EXIST); } size = size > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : size; size = size <= 0 ? DEFAULT_PAGE_SIZE : size; Pageable pageable = new PageRequest(page, size, - Sort.Direction.DESC, "tms"); + Sort.Direction.DESC, "tms"); List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, - pageable); + pageable); return updateState(instances); } @@ -294,7 +311,7 @@ public class JobServiceImpl implements JobService { } catch (SchedulerException e) { LOGGER.error("Job schedule exception. {}", e); throw new GriffinException - .ServiceException("Fail to Get HealthInfo", e); + .ServiceException("Fail to Get HealthInfo", e); } } @@ -305,8 +322,8 @@ public class JobServiceImpl implements JobService { public void deleteExpiredJobInstance() { Long timeMills = System.currentTimeMillis(); List<JobInstanceBean> instances = instanceRepo - .findByExpireTmsLessThanEqual - (timeMills); + .findByExpireTmsLessThanEqual + (timeMills); if (!batchJobOp.pauseJobInstances(instances)) { LOGGER.error("Pause job failure."); return; @@ -329,7 +346,7 @@ public class JobServiceImpl implements JobService { return streamingJobOp; } throw new GriffinException.BadRequestException - (JOB_TYPE_DOES_NOT_SUPPORT); + (JOB_TYPE_DOES_NOT_SUPPORT); } private JobOperator getJobOperator(ProcessType type) { @@ -339,21 +356,21 @@ public class JobServiceImpl implements JobService { return streamingJobOp; } throw new GriffinException.BadRequestException - (MEASURE_TYPE_DOES_NOT_SUPPORT); + (MEASURE_TYPE_DOES_NOT_SUPPORT); } TriggerKey getTriggerKeyIfValid(String qName, String qGroup) throws - SchedulerException { + SchedulerException { TriggerKey triggerKey = triggerKey(qName, qGroup); if (factory.getScheduler().checkExists(triggerKey)) { throw new GriffinException.ConflictException - (QUARTZ_JOB_ALREADY_EXIST); + (QUARTZ_JOB_ALREADY_EXIST); } return triggerKey; } List<? extends Trigger> getTriggers(String name, String group) throws - SchedulerException { + SchedulerException { if (name == null || group == null) { return null; } @@ -363,7 +380,7 @@ public class JobServiceImpl implements JobService { } private JobState genJobState(AbstractJob job, String action) throws - SchedulerException { + SchedulerException { JobOperator op = getJobOperator(job); JobState state = op.getState(job, action); job.setJobState(state); @@ -375,7 +392,7 @@ public class JobServiceImpl implements JobService { } void addJob(TriggerKey tk, AbstractJob job, ProcessType type) throws - Exception { + Exception { JobDetail jobDetail = addJobDetail(tk, job); Trigger trigger = genTriggerInstance(tk, jobDetail, job, type); factory.getScheduler().scheduleJob(trigger); @@ -405,34 +422,34 @@ public class JobServiceImpl implements JobService { private GriffinMeasure getMeasureIfValid(Long measureId) { GriffinMeasure measure = measureRepo.findByIdAndDeleted(measureId, - false); + false); if (measure == null) { LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't " + - "exist or is external measure type.", - measureId); + "exist or is external measure type.", + measureId); throw new GriffinException.BadRequestException(INVALID_MEASURE_ID); } return measure; } private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, AbstractJob - job, ProcessType type) { + job, ProcessType type) { TriggerBuilder builder = newTrigger().withIdentity(tk).forJob(jd); if (type == BATCH) { TimeZone timeZone = getTimeZone(job.getTimeZone()); return builder.withSchedule(cronSchedule(job.getCronExpression()) - .inTimeZone(timeZone)).build(); + .inTimeZone(timeZone)).build(); } else if (type == STREAMING) { return builder.startNow().withSchedule(simpleSchedule() - .withRepeatCount(0)).build(); + .withRepeatCount(0)).build(); } throw new GriffinException.BadRequestException - (JOB_TYPE_DOES_NOT_SUPPORT); + (JOB_TYPE_DOES_NOT_SUPPORT); } private JobDetail addJobDetail(TriggerKey triggerKey, AbstractJob job) - throws SchedulerException { + throws SchedulerException { Scheduler scheduler = factory.getScheduler(); JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup()); JobDetail jobDetail; @@ -441,7 +458,7 @@ public class JobServiceImpl implements JobService { jobDetail = scheduler.getJobDetail(jobKey); } else { jobDetail = newJob(JobInstance.class).storeDurably().withIdentity - (jobKey).build(); + (jobKey).build(); } setJobDataMap(jobDetail, job); scheduler.addJob(jobDetail, isJobKeyExist); @@ -462,9 +479,9 @@ public class JobServiceImpl implements JobService { * @param measureId measure id */ public void deleteJobsRelateToMeasure(Long measureId) throws - SchedulerException { + SchedulerException { List<AbstractJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId, - false); + false); if (CollectionUtils.isEmpty(jobs)) { LOGGER.info("Measure id {} has no related jobs.", measureId); return; @@ -478,7 +495,7 @@ public class JobServiceImpl implements JobService { @Scheduled(fixedDelayString = "${jobInstance.fixedDelay.in.milliseconds}") public void syncInstancesOfAllJobs() { LivySessionStates.State[] states = {STARTING, NOT_STARTED, RECOVERING, - IDLE, RUNNING, BUSY}; + IDLE, RUNNING, BUSY}; List<JobInstanceBean> beans = instanceRepo.findByActiveState(states); for (JobInstanceBean jobInstance : beans) { syncInstancesOfJob(jobInstance); @@ -496,21 +513,21 @@ public class JobServiceImpl implements JobService { return; } String uri = env.getProperty("livy.uri") + "/" - + instance.getSessionId(); + + instance.getSessionId(); TypeReference<HashMap<String, Object>> type = - new TypeReference<HashMap<String, Object>>() { - }; + new TypeReference<HashMap<String, Object>>() { + }; try { String resultStr = restTemplate.getForObject(uri, String.class); HashMap<String, Object> resultMap = JsonUtil.toEntity(resultStr, - type); + type); setJobInstanceIdAndUri(instance, resultMap); } catch (ResourceAccessException e) { LOGGER.error("Your url may be wrong. Please check {}.\n {}", uri, e - .getMessage()); + .getMessage()); } catch (HttpClientErrorException e) { LOGGER.warn("sessionId({}) appId({}) {}.", instance.getSessionId(), - instance.getAppId(), e.getMessage()); + instance.getAppId(), e.getMessage()); setStateByYarn(instance, e); } catch (Exception e) { LOGGER.error(e.getMessage()); @@ -523,7 +540,7 @@ public class JobServiceImpl implements JobService { if (!checkStatus(instance, e)) { int code = e.getStatusCode().value(); boolean match = (code == 400 || code == 404) - && instance.getAppId() != null; + && instance.getAppId() != null; //this means your url is correct,but your param is wrong or livy //session may be overdue. if (match) { @@ -553,7 +570,7 @@ public class JobServiceImpl implements JobService { // {id} not found',this means instance may not be scheduled for // a long time by spark for too many tasks. It may be dead. if (code == 404 && appId == null && (responseBody != null && - responseBody.contains(sessionId.toString()))) { + responseBody.contains(sessionId.toString()))) { instance.setState(DEAD); instance.setDeleted(true); instanceRepo.save(instance); @@ -564,7 +581,7 @@ public class JobServiceImpl implements JobService { private void setStateByYarn(JobInstanceBean instance) { LOGGER.warn("Spark session {} may be overdue! " + - "Now we use yarn to update state.", instance.getSessionId()); + "Now we use yarn to update state.", instance.getSessionId()); String yarnUrl = env.getProperty("yarn.uri"); boolean success = YarnNetUtil.update(yarnUrl, instance); if (!success) { @@ -578,16 +595,16 @@ public class JobServiceImpl implements JobService { private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String - , Object> resultMap) { + , Object> resultMap) { if (resultMap != null) { Object state = resultMap.get("state"); Object appId = resultMap.get("appId"); instance.setState(state == null ? null : LivySessionStates.State - .valueOf(state.toString().toUpperCase - ())); + .valueOf(state.toString().toUpperCase + ())); instance.setAppId(appId == null ? null : appId.toString()); instance.setAppUri(appId == null ? null : env - .getProperty("yarn.uri") + " /cluster/app/ " + appId); + .getProperty("yarn.uri") + " /cluster/app/ " + appId); instanceRepo.save(instance); } } @@ -595,9 +612,9 @@ public class JobServiceImpl implements JobService { public Boolean isJobHealthy(Long jobId) { Pageable pageable = new PageRequest(0, 1, Sort.Direction.DESC, "tms"); List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, - pageable); + pageable); return !CollectionUtils.isEmpty(instances) && LivySessionStates - .isHealthy(instances.get(0).getState()); + .isHealthy(instances.get(0).getState()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f1781ab8/service/src/main/resources/application.properties ---------------------------------------------------------------------- diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties index 2e9c8f7..1c26319 100644 --- a/service/src/main/resources/application.properties +++ b/service/src/main/resources/application.properties @@ -62,3 +62,5 @@ elasticsearch.scheme=http livy.uri=http://localhost:8998/batches # yarn url yarn.uri=http://localhost:8088 +# griffin event listener +internal.event.listeners=GriffinJobEventHook http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f1781ab8/service/src/test/java/org/apache/griffin/core/job/EventServiceTest.java ---------------------------------------------------------------------- diff --git a/service/src/test/java/org/apache/griffin/core/job/EventServiceTest.java b/service/src/test/java/org/apache/griffin/core/job/EventServiceTest.java new file mode 100644 index 0000000..2cbbd84 --- /dev/null +++ b/service/src/test/java/org/apache/griffin/core/job/EventServiceTest.java @@ -0,0 +1,107 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.griffin.core.job; + +import static org.apache.griffin.core.util.EntityHelper.createGriffinMeasure; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.griffin.core.event.EventSourceType; +import org.apache.griffin.core.event.EventType; +import org.apache.griffin.core.event.GriffinEvent; +import org.apache.griffin.core.event.GriffinHook; +import org.apache.griffin.core.exception.GriffinException; +import org.apache.griffin.core.job.entity.BatchJob; +import org.apache.griffin.core.job.entity.JobDataSegment; +import org.apache.griffin.core.measure.entity.GriffinMeasure; +import org.apache.griffin.core.measure.entity.Measure; +import org.apache.griffin.core.util.EntityHelper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest; +import org.springframework.boot.test.autoconfigure.orm.jpa.TestEntityManager; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@DataJpaTest +@ComponentScan("org.apache.griffin.core") +public class EventServiceTest { + @Autowired + private JobService jobService; + + @Autowired + private TestEntityManager entityManager; + + @Autowired + private List<GriffinEvent> eventList; + + @Before + public void setup() throws Exception { + entityManager.clear(); + entityManager.flush(); + setEntityManager(); + } + + @Test + public void testAddJobEvent() throws Exception { + BatchJob batch_Job = EntityHelper.createGriffinJob(); + batch_Job.setCronExpression("0 0 12 * * ?"); + batch_Job.setTimeZone("Asia/Shanghai"); + JobDataSegment jds = new JobDataSegment(); + jds.setAsTsBaseline(true); + jds.setDataConnectorName("target_name"); + List jds_list = new ArrayList(); + jds_list.add(jds); + batch_Job.setSegments(jds_list); + jobService.addJob(batch_Job); + Assert.assertEquals(2, eventList.size()); + Assert.assertEquals(EventType.CREATION_EVENT, eventList.get(0).getType()); + Assert.assertEquals(EventSourceType.JOB, eventList.get(1).getSourceType()); + } + + public void setEntityManager() throws Exception { + Measure measure1 = createGriffinMeasure("m1"); + measure1.setOrganization("org1"); + ((GriffinMeasure) measure1).setProcessType(GriffinMeasure.ProcessType.BATCH); + entityManager.persistAndFlush(measure1); + } + + @Configuration(value = "GriffinTestJobEventHook") + public static class TestJobEventHook implements GriffinHook { + private List<GriffinEvent> eventList = new ArrayList<>(); + + @Override + public void onEvent(GriffinEvent event) throws GriffinException { + eventList.add(event); + } + + @Bean + public List<GriffinEvent> getReceivedEvents() { + return eventList; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f1781ab8/service/src/test/resources/application.properties ---------------------------------------------------------------------- diff --git a/service/src/test/resources/application.properties b/service/src/test/resources/application.properties index 96d28dc..86d5316 100644 --- a/service/src/test/resources/application.properties +++ b/service/src/test/resources/application.properties @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -56,4 +56,6 @@ elasticsearch.host=localhost elasticsearch.port=9200 elasticsearch.scheme=http # elasticsearch.user = user -# elasticsearch.password = password \ No newline at end of file +# elasticsearch.password = password +# griffin event listener +internal.event.listeners=GriffinJobEventHook,GriffinTestJobEventHook