Repository: lens Updated Branches: refs/heads/master 5db78ce18 -> d9911f618
LENS-1298: Thread should have ability to wait for Events to be processed Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/d9911f61 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/d9911f61 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/d9911f61 Branch: refs/heads/master Commit: d9911f618f345347f62c20184dc2023e07498bc8 Parents: 5db78ce Author: Lavkesh Lahngir <[email protected]> Authored: Tue Sep 6 15:17:34 2016 +0530 Committer: Rajat Khandelwal <[email protected]> Committed: Tue Sep 6 15:17:34 2016 +0530 ---------------------------------------------------------------------- .../server/api/events/LensEventService.java | 7 +++ .../server/api/events/SchedulerAlarmEvent.java | 7 ++- .../apache/lens/server/EventServiceImpl.java | 16 ++++++- .../lens/server/scheduler/AlarmService.java | 46 +++++++++++++++++--- .../server/scheduler/SchedulerServiceImpl.java | 1 + .../lens/server/query/TestEventService.java | 33 +++++++++++++- 6 files changed, 100 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java b/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java index 4536a18..7000dcc 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java @@ -73,4 +73,11 @@ public interface LensEventService extends LensService { */ <T extends LensEvent> Collection<LensEventListener> getListeners(Class<T> changeType); + /** + * Process an event synchronously. + * It does not return until the processing is finished. + * @param event + * @throws LensException + */ + void notifyEventSync(LensEvent event) throws LensException; } http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java b/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java index 0f2dabe..7f4ec6b 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java @@ -40,7 +40,7 @@ public class SchedulerAlarmEvent extends LensEvent { private SchedulerJobInstanceHandle previousInstance; public SchedulerAlarmEvent(SchedulerJobHandle jobHandle, DateTime nominalTime, EventType type, - SchedulerJobInstanceHandle previousInstance) { + SchedulerJobInstanceHandle previousInstance) { super(nominalTime.getMillis()); this.jobHandle = jobHandle; this.nominalTime = nominalTime; @@ -53,6 +53,11 @@ public class SchedulerAlarmEvent extends LensEvent { return jobHandle.getHandleIdString(); } + @Override + public String toString() { + return "Job Handle : " + jobHandle + ", Nominal Time :" + nominalTime + ", type : " + type; + } + /** * Event type to know what kind of operations we want. */ http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java index 369885d..ba12040 100644 --- a/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java @@ -118,7 +118,9 @@ public class EventServiceImpl extends AbstractService implements LensEventServic */ private final class EventHandler implements Runnable { - /** The event. */ + /** + * The event. + */ final LensEvent event; /** @@ -180,6 +182,18 @@ public class EventServiceImpl extends AbstractService implements LensEventServic } @Override + public void notifyEventSync(LensEvent event) throws LensException { + if (getServiceState() != STATE.STARTED) { + throw new LensException("Event service is not in STARTED state. Current state is " + getServiceState()); + } + if (event == null) { + return; + } + // Call the run() method directly and not submit to Executor Service. + new EventHandler(event).run(); + } + + @Override public HealthStatus getHealthStatus() { return (this.getServiceState().equals(STATE.STARTED) && !eventHandlerPool.isShutdown() http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java index 6487323..269acd9 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java @@ -18,6 +18,9 @@ package org.apache.lens.server.scheduler; +import java.util.ArrayList; +import java.util.List; + import org.apache.lens.api.scheduler.SchedulerJobHandle; import org.apache.lens.api.scheduler.XFrequency; import org.apache.lens.api.scheduler.XFrequencyEnum; @@ -36,6 +39,8 @@ import org.joda.time.DateTime; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; /** @@ -53,6 +58,11 @@ public class AlarmService extends AbstractService implements LensService { public static final String LENS_JOBS = "LensJobs"; public static final String ALARM_SERVICE = "AlarmService"; + private static final String JOB_HANDLE = "jobHandle"; + private static final String SHOULD_WAIT = "shouldWaitForProcessing"; + @Getter + @Setter + private boolean shouldWaitForScheduleEventProcessing = false; private Scheduler scheduler; @@ -116,6 +126,15 @@ public class AlarmService extends AbstractService implements LensService { } } + public List<JobExecutionContext> getCurrentlyExecutingJobs() { + try { + return scheduler.getCurrentlyExecutingJobs(); + } catch (SchedulerException e) { + log.error("Failed to get currently executing jobs"); + } + return new ArrayList<>(); + } + /** * This method can be used by any consumer who wants to receive notifications during a time range at a given * frequency. @@ -132,8 +151,8 @@ public class AlarmService extends AbstractService implements LensService { public void schedule(DateTime start, DateTime end, XFrequency frequency, String jobHandle) throws LensException { // accept the schedule and then keep on sending the notifications for that schedule JobDataMap map = new JobDataMap(); - map.put("jobHandle", jobHandle); - + map.put(JOB_HANDLE, jobHandle); + map.put(SHOULD_WAIT, shouldWaitForScheduleEventProcessing); JobDetail job = JobBuilder.newJob(LensJob.class).withIdentity(jobHandle, LENS_JOBS).usingJobData(map).build(); Trigger trigger; @@ -230,24 +249,37 @@ public class AlarmService extends AbstractService implements LensService { public static class LensJob implements Job { + private void notifyEventService(SchedulerAlarmEvent alarmEvent, boolean shouldWait) + throws LensException, InterruptedException { + LensEventService eventService = LensServices.get().getService(LensEventService.NAME); + if (shouldWait) { + eventService.notifyEventSync(alarmEvent); + } else { + eventService.notifyEvent(alarmEvent); + } + } + @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { JobDataMap data = jobExecutionContext.getMergedJobDataMap(); DateTime nominalTime = new DateTime(jobExecutionContext.getScheduledFireTime()); - SchedulerJobHandle jobHandle = SchedulerJobHandle.fromString(data.getString("jobHandle")); + SchedulerJobHandle jobHandle = SchedulerJobHandle.fromString(data.getString(JOB_HANDLE)); + boolean shouldWait = data.getBoolean(SHOULD_WAIT); SchedulerAlarmEvent alarmEvent = new SchedulerAlarmEvent(jobHandle, nominalTime, SchedulerAlarmEvent.EventType.SCHEDULE, null); try { - LensEventService eventService = LensServices.get().getService(LensEventService.NAME); - eventService.notifyEvent(alarmEvent); + notifyEventService(alarmEvent, shouldWait); if (jobExecutionContext.getNextFireTime() == null) { - eventService - .notifyEvent(new SchedulerAlarmEvent(jobHandle, nominalTime, SchedulerAlarmEvent.EventType.EXPIRE, null)); + SchedulerAlarmEvent expireEvent = (new SchedulerAlarmEvent(jobHandle, nominalTime, + SchedulerAlarmEvent.EventType.EXPIRE, null)); + notifyEventService(expireEvent, shouldWait); } } catch (LensException e) { log.error("Failed to notify SchedulerAlarmEvent for jobHandle: {} and scheduleTime: {}", jobHandle.getHandleIdString(), nominalTime.toString()); throw new JobExecutionException("Failed to notify alarmEvent", e); + } catch (InterruptedException e) { + log.error("Job execution tread interrupted", e); } } } http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java index 323e053..9cee0c2 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java @@ -74,6 +74,7 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe @Getter @VisibleForTesting protected SchedulerDAO schedulerDAO; + @Getter private AlarmService alarmService; /** http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java index 94011d2..31b6625 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java @@ -543,6 +543,38 @@ public class TestEventService { "DummyAsncEventListener_AsyncThread-5"))); } + /** + * Test synchronous events + * @throws Exception + */ + @Test + public void testNotifySync() throws Exception { + service.addListenerForType(new TestEventHandler(), TestEvent.class); + TestEvent testEvent = new TestEvent("ID"); + service.notifyEventSync(testEvent); + assertTrue(testEvent.processed); + } + + private static class TestEvent extends LensEvent{ + String id; + boolean processed = false; + public TestEvent(String id) { + super(System.currentTimeMillis()); + this.id = id; + } + @Override + public String getEventId() { + return id; + } + } + + private static class TestEventHandler extends AsyncEventListener<TestEvent> { + + @Override + public void process(TestEvent event) { + event.processed = true; + } + } private static class DummyAsncEventListener extends AsyncEventListener<QuerySuccess> { public DummyAsncEventListener(){ super(5); //core pool = 5 @@ -552,5 +584,4 @@ public class TestEventService { throw new RuntimeException("Simulated Exception"); } } - }
