Repository: lens Updated Branches: refs/heads/master 68c5267f0 -> 92456effb
LENS-987 : Add AlarmService for scheduling time based queries Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/92456eff Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/92456eff Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/92456eff Branch: refs/heads/master Commit: 92456effb9508d4578894a82c1f2a419fe504d97 Parents: 68c5267 Author: Lavkesh Lahngir <[email protected]> Authored: Mon Jun 13 17:17:04 2016 +0530 Committer: Amareshwari Sriramadasu <[email protected]> Committed: Mon Jun 13 17:17:04 2016 +0530 ---------------------------------------------------------------------- lens-server-api/pom.xml | 4 + .../server/api/events/SchedulerAlarmEvent.java | 51 +++++ lens-server/pom.xml | 4 + .../org/apache/lens/server/LensServices.java | 7 +- .../lens/server/metrics/MetricsServiceImpl.java | 2 + .../notification/services/AlarmService.java | 220 +++++++++++++++++++ .../notification/services/AlarmServiceTest.java | 180 +++++++++++++++ pom.xml | 6 + 8 files changed, 473 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/92456eff/lens-server-api/pom.xml ---------------------------------------------------------------------- diff --git a/lens-server-api/pom.xml b/lens-server-api/pom.xml index 5508fb9..b59c249 100644 --- a/lens-server-api/pom.xml +++ b/lens-server-api/pom.xml @@ -105,5 +105,9 @@ <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/lens/blob/92456eff/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java b/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java new file mode 100644 index 0000000..2bcf0d6 --- /dev/null +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lens.server.api.events; + +import org.apache.lens.api.query.SchedulerJobHandle; + +import org.joda.time.DateTime; + +import lombok.Data; + +/** + * This event is triggered by the AlarmService whenever a scheduled query needs to be scheduled. + */ +@Data +public class SchedulerAlarmEvent extends LensEvent { + + /** + * jobHandle for which the alarm needs to be triggered. + */ + private SchedulerJobHandle jobHandle; + + private DateTime nominalTime; + + public SchedulerAlarmEvent(SchedulerJobHandle jobHandle, DateTime nominalTime) { + super(nominalTime.getMillis()); + this.jobHandle = jobHandle; + this.nominalTime = nominalTime; + } + + @Override + public String getEventId() { + return jobHandle.getHandleIdString(); + } + +} http://git-wip-us.apache.org/repos/asf/lens/blob/92456eff/lens-server/pom.xml ---------------------------------------------------------------------- diff --git a/lens-server/pom.xml b/lens-server/pom.xml index b73cd05..2294b2a 100644 --- a/lens-server/pom.xml +++ b/lens-server/pom.xml @@ -242,6 +242,10 @@ <artifactId>guava</artifactId> </dependency> <dependency> + <groupId>org.quartz-scheduler</groupId> + <artifactId>quartz</artifactId> + </dependency> + <dependency> <groupId>org.apache.lens</groupId> <artifactId>lens-server-api</artifactId> <type>test-jar</type> http://git-wip-us.apache.org/repos/asf/lens/blob/92456eff/lens-server/src/main/java/org/apache/lens/server/LensServices.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/LensServices.java b/lens-server/src/main/java/org/apache/lens/server/LensServices.java index 10ec8b7..f6f2f36 100644 --- a/lens-server/src/main/java/org/apache/lens/server/LensServices.java +++ b/lens-server/src/main/java/org/apache/lens/server/LensServices.java @@ -20,7 +20,10 @@ package org.apache.lens.server; import static org.apache.lens.server.api.LensConfConstants.*; -import java.io.*; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.lang.reflect.Constructor; import java.util.*; import java.util.concurrent.*; @@ -34,6 +37,7 @@ import org.apache.lens.server.api.metrics.MetricsService; import org.apache.lens.server.metrics.MetricsServiceImpl; import org.apache.lens.server.model.LogSegregationContext; import org.apache.lens.server.model.MappedDiagnosticLogSegregationContext; +import org.apache.lens.server.scheduler.notification.services.AlarmService; import org.apache.lens.server.session.LensSessionImpl; import org.apache.lens.server.stats.StatisticsService; import org.apache.lens.server.user.UserConfigLoaderFactory; @@ -199,6 +203,7 @@ public class LensServices extends CompositeService implements ServiceProvider { UserConfigLoaderFactory.init(conf); // Add default services addService(cliService); + addService(new AlarmService(AlarmService.NAME)); addService(new EventServiceImpl(LensEventService.NAME)); addService(new MetricsServiceImpl(MetricsService.NAME)); addService(new StatisticsService(StatisticsService.STATS_SVC_NAME)); http://git-wip-us.apache.org/repos/asf/lens/blob/92456eff/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java index 6852265..1e8d540 100644 --- a/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java @@ -49,6 +49,7 @@ import org.apache.lens.server.healthcheck.LensServiceHealthCheck; import org.apache.lens.server.query.QueryExecutionServiceImpl; import org.apache.lens.server.quota.QuotaServiceImpl; import org.apache.lens.server.scheduler.SchedulerServiceImpl; +import org.apache.lens.server.scheduler.notification.services.AlarmService; import org.apache.lens.server.session.DatabaseResourceService; import org.apache.lens.server.session.HiveSessionService; @@ -267,6 +268,7 @@ public class MetricsServiceImpl extends AbstractService implements MetricsServic healthCheck.register(QuotaServiceImpl.NAME, new LensServiceHealthCheck(QuotaServiceImpl.NAME)); healthCheck.register(MetricsServiceImpl.NAME, new LensServiceHealthCheck(MetricsServiceImpl.NAME)); healthCheck.register(EventServiceImpl.NAME, new LensServiceHealthCheck(EventServiceImpl.NAME)); + healthCheck.register(AlarmService.NAME, new LensServiceHealthCheck(AlarmService.NAME)); initCounters(); timeBetweenPolls = hiveConf.getInt(LensConfConstants.REPORTING_PERIOD, 10); http://git-wip-us.apache.org/repos/asf/lens/blob/92456eff/lens-server/src/main/java/org/apache/lens/server/scheduler/notification/services/AlarmService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/notification/services/AlarmService.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/notification/services/AlarmService.java new file mode 100644 index 0000000..f575bc8 --- /dev/null +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/notification/services/AlarmService.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lens.server.scheduler.notification.services; + +import org.apache.lens.api.query.SchedulerJobHandle; +import org.apache.lens.api.scheduler.XFrequency; +import org.apache.lens.api.scheduler.XFrequencyEnum; +import org.apache.lens.server.LensServices; +import org.apache.lens.server.api.LensService; +import org.apache.lens.server.api.error.LensException; +import org.apache.lens.server.api.events.LensEventService; +import org.apache.lens.server.api.events.SchedulerAlarmEvent; +import org.apache.lens.server.api.health.HealthStatus; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.AbstractService; + +import org.joda.time.DateTime; +import org.quartz.*; +import org.quartz.impl.StdSchedulerFactory; + +import lombok.extern.slf4j.Slf4j; + +/** + * This service is used primarily by Scheduler to get alarm notifications for scheduled queries. + * + * As a schedule this service accepts start time, frequency, end time and timeZone. It also requires the + * {@link org.apache.lens.api.query.SchedulerJobHandle} which it sends as part of the + * {@link org.apache.lens.server.api.events.SchedulerAlarmEvent} to inform the scheduler about the job for which + * job the notification has been generated. + */ +@Slf4j +public class AlarmService extends AbstractService implements LensService { + + public static final String NAME = "alarm-service"; + + private Scheduler scheduler; + + /** + * True if the service started properly and is running fine, false otherwise. + */ + private boolean isHealthy = true; + + /** + * Contains the reason if service is not healthy. + */ + private String healthCause; + + /** + * Creates a new instance of AlarmService. + * + * @param name the name + */ + public AlarmService(String name) { + super(name); + } + + @Override + public HealthStatus getHealthStatus() { + return isHealthy + ? new HealthStatus(isHealthy, "Alarm service is healthy.") + : new HealthStatus(isHealthy, healthCause); + } + + public synchronized void init(HiveConf hiveConf) { + super.init(hiveConf); + try { + this.scheduler = StdSchedulerFactory.getDefaultScheduler(); + } catch (SchedulerException e) { + isHealthy = false; + healthCause = "Failed to initialize the Quartz Scheduler for AlarmService."; + log.error(healthCause, e); + } + } + + public synchronized void start() { + try { + scheduler.start(); + log.info("Alarm service started successfully!"); + } catch (SchedulerException e) { + isHealthy = false; + healthCause = "Failed to start the Quartz Scheduler for AlarmService."; + log.error(healthCause, e); + } + } + + @Override + public synchronized void stop() { + try { + scheduler.shutdown(); + log.info("Alarm Service stopped successfully."); + } catch (SchedulerException e) { + log.error("Failed to shut down the Quartz Scheduler for AlarmService.", e); + } + } + + /** + * This method can be used by any consumer who wants to receive notifications during a time range at a given + * frequency. + * + * This method is intended to be used by LensScheduler to subscribe for time based notifications to schedule queries. + * On receiving a job to be scheduled LensScheduler will subscribe to all triggers required for the job, including + * AlarmService for time based triggers. + * + * @param start start time for notifications + * @param end end time for notifications + * @param frequency Frequency to determine the frequency at which notification should be sent. + * @param jobHandle Must be a unique jobHanlde across all consumers + */ + public void schedule(DateTime start, DateTime end, XFrequency frequency, String jobHandle) throws LensException { + // accept the schedule and then keep on sending the notifications for that schedule + JobDataMap map = new JobDataMap(); + map.put("jobHandle", jobHandle); + + JobDetail job = JobBuilder.newJob(LensJob.class).withIdentity(jobHandle, "LensJobs") + .usingJobData(map).build(); + + Trigger trigger; + if (frequency.getEnum() != null) { //for enum expression: create a trigger using calendar interval + CalendarIntervalScheduleBuilder scheduleBuilder = CalendarIntervalScheduleBuilder.calendarIntervalSchedule() + .withInterval(getTimeInterval(frequency.getEnum()), getTimeUnit(frequency.getEnum())) + .withMisfireHandlingInstructionIgnoreMisfires(); + trigger = TriggerBuilder.newTrigger() + .withIdentity(jobHandle, "AlarmService") + .startAt(start.toDate()) + .endAt(end.toDate()) + .withSchedule(scheduleBuilder) + .build(); + } else { // for cron expression create a cron trigger + trigger = TriggerBuilder.newTrigger() + .withIdentity(jobHandle, "AlarmService") + .withSchedule(CronScheduleBuilder.cronSchedule(frequency.getCronExpression())) + .build(); + } + + // Tell quartz to run the job using our trigger + try { + scheduler.scheduleJob(job, trigger); + } catch (SchedulerException e) { + log.error("Error scheduling job with jobHandle: {}", jobHandle); + throw new LensException("Failed to schedule job with jobHandle: " + jobHandle, e); + } + } + + + private int getTimeInterval(XFrequencyEnum frequencyEnum) { + // since quarterly is not supported natively, we express it as 3 months + return frequencyEnum == XFrequencyEnum.QUARTERLY ? 3 : 1; + } + + + // Maps the timeunit in entity specification to the one in Quartz DateBuilder + private DateBuilder.IntervalUnit getTimeUnit(XFrequencyEnum frequencyEnum) { + switch (frequencyEnum) { + + case DAILY: + return DateBuilder.IntervalUnit.DAY; + + case WEEKLY: + return DateBuilder.IntervalUnit.WEEK; + + case MONTHLY: + return DateBuilder.IntervalUnit.MONTH; + + case QUARTERLY: + return DateBuilder.IntervalUnit.MONTH; + + case YEARLY: + return DateBuilder.IntervalUnit.YEAR; + + default: + throw new IllegalArgumentException("Invalid frequency enum expression: " + frequencyEnum.name()); + } + } + + public boolean unSchedule(SchedulerJobHandle jobHandle) throws LensException { + // stop sending notifications for this job handle + try { + return scheduler.deleteJob(JobKey.jobKey(jobHandle.getHandleIdString(), "LensScheduler")); + } catch (SchedulerException e) { + log.error("Failed to remove alarm triggers for job with jobHandle: " + jobHandle, e); + throw new LensException("Failed to remove alarm triggers for job with jobHandle: " + jobHandle, e); + } + } + + public static class LensJob implements Job { + + @Override + public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { + JobDataMap data = jobExecutionContext.getMergedJobDataMap(); + DateTime nominalTime = new DateTime(jobExecutionContext.getScheduledFireTime()); + SchedulerJobHandle jobHandle = SchedulerJobHandle.fromString(data.getString("jobHandle")); + SchedulerAlarmEvent alarmEvent = new SchedulerAlarmEvent(jobHandle, nominalTime); + try { + LensEventService eventService = LensServices.get().getService(LensEventService.NAME); + eventService.notifyEvent(alarmEvent); + } catch (LensException e) { + log.error("Failed to notify SchedulerAlarmEvent for jobHandle: {} and nominalTime: {}", + jobHandle.getHandleIdString(), nominalTime.toString(), e); + throw new JobExecutionException("Failed to notify alarmEvent", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/92456eff/lens-server/src/test/java/org/apache/lens/server/scheduler/notification/services/AlarmServiceTest.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/scheduler/notification/services/AlarmServiceTest.java b/lens-server/src/test/java/org/apache/lens/server/scheduler/notification/services/AlarmServiceTest.java new file mode 100644 index 0000000..838b9b8 --- /dev/null +++ b/lens-server/src/test/java/org/apache/lens/server/scheduler/notification/services/AlarmServiceTest.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lens.server.scheduler.notification.services; + +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import java.util.*; + +import org.apache.lens.api.query.SchedulerJobHandle; +import org.apache.lens.api.scheduler.XFrequency; +import org.apache.lens.api.scheduler.XFrequencyEnum; +import org.apache.lens.server.EventServiceImpl; +import org.apache.lens.server.LensServerConf; +import org.apache.lens.server.LensServices; +import org.apache.lens.server.api.LensConfConstants; +import org.apache.lens.server.api.error.LensException; +import org.apache.lens.server.api.events.LensEventListener; +import org.apache.lens.server.api.events.LensEventService; +import org.apache.lens.server.api.events.SchedulerAlarmEvent; + +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import org.quartz.*; +import org.quartz.impl.StdSchedulerFactory; + +import org.testng.Assert; +import org.testng.annotations.*; + +import lombok.extern.slf4j.Slf4j; +/** + * Tests for AlarmService. + */ +@Slf4j +@Test(groups = "unit-test") +public class AlarmServiceTest { + + private AlarmService alarmService; + + private EventServiceImpl eventService; + + private static List<SchedulerAlarmEvent> events = new LinkedList<>(); + + private static volatile int counter = 0; + + private final LensEventListener<SchedulerAlarmEvent> alarmEventListener = new + LensEventListener<SchedulerAlarmEvent>() { + + @Override + public void onEvent(SchedulerAlarmEvent event) { + events.add(event); + } + }; + + @BeforeMethod + public void initializeEventsList() { + events = new LinkedList<>(); + } + + @BeforeTest + public void setUp() { + System.setProperty(LensConfConstants.CONFIG_LOCATION, "target/test-classes/"); + LensServices.get().init(LensServerConf.getHiveConf()); + LensServices.get().start(); + eventService = LensServices.get().getService(LensEventService.NAME); + assertNotNull(eventService); + alarmService = LensServices.get().getService(AlarmService.NAME); + assertNotNull(alarmService); + eventService.addListenerForType(alarmEventListener, SchedulerAlarmEvent.class); + } + + /** + * This test generally tests the basic understanding and assumptions about quartz framework with a sample job. + * + * @throws SchedulerException + * @throws InterruptedException + */ + @Test + public void testCoreExecution() throws SchedulerException, InterruptedException { + SchedulerFactory sf = new StdSchedulerFactory(); + Scheduler scheduler = sf.getScheduler(); + scheduler.start(); + try { + // prepare a sample Job + JobDetail job = JobBuilder.newJob(TestJob.class).withIdentity("random", "group").build(); + scheduler.scheduleJob(job, getPastPerSecondsTrigger()); + Thread.sleep(2000); + Assert.assertEquals(counter, 10); + } finally { + scheduler.shutdown(); + } + } + + private CalendarIntervalScheduleBuilder getPerSecondCalendar() { + return CalendarIntervalScheduleBuilder.calendarIntervalSchedule() + .withInterval(1, DateBuilder.IntervalUnit.SECOND) + .withMisfireHandlingInstructionFireAndProceed(); + } + private Trigger getPastPerSecondsTrigger() { + CalendarIntervalScheduleBuilder scheduleBuilder = getPerSecondCalendar(); + Date start = new Date(); + start = new Date(start.getTime() - 10000); + Date end = new Date(); + + return TriggerBuilder.newTrigger().withIdentity("trigger1", "group1") + .startAt(start).endAt(end).withSchedule(scheduleBuilder).build(); + } + + + @PersistJobDataAfterExecution + @DisallowConcurrentExecution + public static class TestJob implements Job { + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + AlarmServiceTest.counter++; + } + } + + @Test + public void testAlarmServiceEnums() throws InterruptedException, LensException { + DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-mm-dd"); + DateTime start = formatter.parseDateTime("2016-03-03"); + DateTime end = formatter.parseDateTime("2016-03-06"); + SchedulerJobHandle jobHandle = new SchedulerJobHandle(UUID.randomUUID()); + XFrequency frequency = new XFrequency(); + frequency.setEnum(XFrequencyEnum.DAILY); + alarmService.schedule(start, end, frequency, jobHandle.toString()); + Thread.sleep(2000); // give chance to the event listener to process the data + int count = 0; + for (SchedulerAlarmEvent event : events) { + if (event.getJobHandle().equals(jobHandle)) { + count++; + } + } + Assert.assertEquals(count, 3); + DateTime expectedDate = start; + Set<DateTime> actualSet = new HashSet<>(); + for (SchedulerAlarmEvent e : events) { + actualSet.add(e.getNominalTime()); + } + + for (int i = 0; i < 3; i++) { + Assert.assertTrue(actualSet.contains(expectedDate)); + expectedDate = expectedDate.plusDays(1); + } + } + + @Test + public void testAlarmServiceCronExpressions() throws InterruptedException, LensException { + DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-mm-dd"); + DateTime start = formatter.parseDateTime("2016-03-03"); + DateTime end = formatter.parseDateTime("2016-03-06"); + SchedulerJobHandle jobHandle = new SchedulerJobHandle(UUID.randomUUID()); + System.out.println("jobHandle = " + jobHandle); + XFrequency frequency = new XFrequency(); + frequency.setCronExpression("0/1 * * * * ?"); + alarmService.schedule(start, end, frequency, jobHandle.toString()); + Thread.sleep(2000); + // Assert that the events are fired and at per second interval. + assertTrue(events.size() > 1); + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/92456eff/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1c3c9da..9695126 100644 --- a/pom.xml +++ b/pom.xml @@ -57,6 +57,7 @@ <commons.lang.version>2.4</commons.lang.version> <commons.collections.version>3.2.1</commons.collections.version> <joda.time.version>2.0</joda.time.version> + <quartz.version>2.2.2</quartz.version> <guava.version>13.0.1</guava.version> <lombok.version>1.16.6</lombok.version> <lombok.maven.plugin.version>1.16.4.1</lombok.maven.plugin.version> @@ -823,6 +824,11 @@ <dependencyManagement> <dependencies> <dependency> + <groupId>org.quartz-scheduler</groupId> + <artifactId>quartz</artifactId> + <version>${quartz.version}</version> + </dependency> + <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version>
