http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java new file mode 100644 index 0000000..4883fe7 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.event; + + +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.state.ID; +import org.apache.hadoop.fs.Path; + +/** + * An event generated by {@link org.apache.falcon.notification.service.impl.DataAvailabilityService} + * indicating availability or non-availability of a dataset. + */ +public class DataEvent implements Event { + private final ID callbackID; + private Path dataLocation; + private LocationType dataType; + private STATUS status; + + /** + * Enumerates the status of data. + */ + public enum STATUS { + AVAILABLE, + UNAVAILABLE + } + + public DataEvent(ID callbackID, Path location, LocationType locType, STATUS availability) { + this.callbackID = callbackID; + this.dataLocation = location; + this.dataType = locType; + this.status = availability; + } + + public STATUS getStatus() { + return status; + } + + public void setStatus(STATUS availability) { + this.status = availability; + } + + public Path getDataLocation() { + return dataLocation; + } + + public LocationType getDataType() { + return dataType; + } + + @Override + public NotificationServicesRegistry.SERVICE getSource() { + return NotificationServicesRegistry.SERVICE.DATA; + } + + @Override + public ID getTarget() { + return callbackID; + } +}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java new file mode 100644 index 0000000..140973b --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.event; + +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.state.ID; + +/** + * An events that are generated by notification services must implement this interface. + */ +public interface Event { + + /** + * @return The service that generated this event + */ + NotificationServicesRegistry.SERVICE getSource(); + + /** + * @return ID of the notification handler for which this event was meant for. + */ + ID getTarget(); +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java new file mode 100644 index 0000000..c587343 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.event; + +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.state.ID; +import org.apache.oozie.client.WorkflowJob; +import org.joda.time.DateTime; + +/** + * An event generated by {@link org.apache.falcon.notification.service.impl.JobCompletionService} + * indicating completion of a Job. + */ +public class JobCompletedEvent implements Event { + + private WorkflowJob.Status status; + private final ID callbackID; + private DateTime endTime; + + public JobCompletedEvent(ID callbackID, WorkflowJob.Status jobStatus, DateTime end) { + this.callbackID = callbackID; + this.status = jobStatus; + this.endTime = end; + } + + public WorkflowJob.Status getStatus() { + return status; + } + + @Override + public NotificationServicesRegistry.SERVICE getSource() { + return NotificationServicesRegistry.SERVICE.JOB_COMPLETION; + } + + @Override + public ID getTarget() { + return callbackID; + } + + public DateTime getEndTime() { + return endTime; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java new file mode 100644 index 0000000..55023e7 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.event; + +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.state.ID; +import org.joda.time.DateTime; + +/** + * An event generated by {@link org.apache.falcon.notification.service.impl.SchedulerService} + * indicating if an instance was scheduled for execution. + */ +public class JobScheduledEvent implements Event { + private final ID callbackID; + private String externalID; + private STATUS status; + private DateTime startTime; + + public JobScheduledEvent(ID callbackID, STATUS status) { + this.callbackID = callbackID; + this.status = status; + } + + public String getExternalID() { + return externalID; + } + + public void setExternalID(String externalID) { + this.externalID = externalID; + } + + @Override + public NotificationServicesRegistry.SERVICE getSource() { + return NotificationServicesRegistry.SERVICE.JOB_SCHEDULE; + } + + @Override + public ID getTarget() { + return callbackID; + } + + /** + * @return - The status of the scheduled DAG/Job. + */ + public STATUS getStatus() { + return status; + } + + + public DateTime getStartTime() { + return startTime; + } + + public void setStartTime(DateTime startTime) { + this.startTime = startTime; + } + + /** + * Enumeration of possible statuses of a DAG/Job. + */ + public enum STATUS { + FAILED, + SUCCESSFUL + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java new file mode 100644 index 0000000..7ec4de6 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.event; + +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.state.ID; +import org.joda.time.DateTime; + +/** + * An event generated by {@link org.apache.falcon.notification.service.impl.AlarmService} + * indicating that a given time duration has elapsed. + */ +public class TimeElapsedEvent implements Event { + private DateTime startTime; + private DateTime endTime; + private DateTime instanceTime; + private final ID callbackID; + + public DateTime getInstanceTime() { + return instanceTime; + } + + public DateTime getStartTime() { + return startTime; + } + + public DateTime getEndTime() { + return endTime; + } + + public TimeElapsedEvent(ID callbackID, DateTime start, DateTime end, DateTime instTime) { + this.callbackID = callbackID; + this.startTime = start; + this.endTime = end; + this.instanceTime = instTime; + } + + @Override + public NotificationServicesRegistry.SERVICE getSource() { + return NotificationServicesRegistry.SERVICE.TIME; + } + + @Override + public ID getTarget() { + return callbackID; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java new file mode 100644 index 0000000..cccdeac --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.impl; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.exception.NotificationServiceException; +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.execution.SchedulerUtil; +import org.apache.falcon.notification.service.FalconNotificationService; +import org.apache.falcon.notification.service.event.TimeElapsedEvent; +import org.apache.falcon.notification.service.request.NotificationRequest; +import org.apache.falcon.notification.service.request.AlarmRequest; +import org.apache.falcon.state.ID; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.quartz.CalendarIntervalTrigger; +import org.quartz.DateBuilder; +import org.quartz.Job; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.TriggerKey; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.quartz.CalendarIntervalScheduleBuilder.calendarIntervalSchedule; +import static org.quartz.JobBuilder.newJob; +import static org.quartz.TriggerBuilder.newTrigger; + +/** + * This notification service notifies {@link NotificationHandler} when requested time + * event has occurred. The class users to subscribe to frequency based, cron based or some calendar based time events. + */ +public class AlarmService implements FalconNotificationService { + + private static final Logger LOG = LoggerFactory.getLogger(AlarmService.class); + + private Map<ID, TriggerKey> notifications = new HashMap<ID, TriggerKey>(); + private static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10); + private Scheduler scheduler; + + @Override + public void init() throws FalconException { + try { + scheduler = StdSchedulerFactory.getDefaultScheduler(); + scheduler.start(); + } catch (SchedulerException e) { + throw new FalconException(e); + } + } + + @Override + public void register(NotificationRequest notificationRequest) throws NotificationServiceException { + LOG.info("Registering alarm notification for " + notificationRequest.getCallbackId()); + AlarmRequest request = (AlarmRequest) notificationRequest; + DateTime currentTime = DateTime.now(); + DateTime nextStartTime = request.getStartTime(); + DateTime endTime; + if (request.getEndTime().isBefore(currentTime)) { + endTime = request.getEndTime(); + } else { + endTime = currentTime; + } + // Handle past events. + // TODO : Quartz doesn't seem to support running jobs for past events. + // TODO : Remove the handling of past events when that support is added. + if (request.getStartTime().isBefore(currentTime)) { + + List<Date> instanceTimes = EntityUtil.getInstanceTimes(request.getStartTime().toDate(), + request.getFrequency(), request.getTimeZone(), request.getStartTime().toDate(), + endTime.toDate()); + if (instanceTimes != null && !instanceTimes.isEmpty()) { + Date lastInstanceTime = instanceTimes.get(instanceTimes.size() - 1); + nextStartTime = new DateTime(lastInstanceTime.getTime() + + SchedulerUtil.getFrequencyInMillis(new DateTime(lastInstanceTime), request.getFrequency())); + // Introduce some delay to allow for rest of the registration to complete. + LOG.debug("Triggering events for past from {} till {}", instanceTimes.get(0), lastInstanceTime); + executor.schedule(new CatchupJob(request, instanceTimes), 1, TimeUnit.SECONDS); + } + } + // All past events have been scheduled. Nothing to schedule in the future. + if (request.getEndTime().isBefore(currentTime)) { + return; + } + LOG.debug("Scheduling to trigger events from {} to {} with frequency {}", nextStartTime, request.getEndTime(), + request.getFrequency()); + // Schedule future events using Quartz + CalendarIntervalTrigger trigger = newTrigger() + .withIdentity(notificationRequest.getCallbackId().toString(), "Falcon") + .startAt(nextStartTime.toDate()) + .endAt(request.getEndTime().toDate()) + .withSchedule( + calendarIntervalSchedule() + .withInterval(request.getFrequency().getFrequencyAsInt(), + getTimeUnit(request.getFrequency().getTimeUnit())) + .withMisfireHandlingInstructionFireAndProceed()) + .build(); + + // define the job and tie it to our Job class + JobDetail job = newJob(FalconProcessJob.class) + .withIdentity(getJobKey(notificationRequest.getCallbackId().toString())) + .setJobData(getJobDataMap((AlarmRequest) notificationRequest)) + .build(); + notifications.put(notificationRequest.getCallbackId(), trigger.getKey()); + // Tell quartz to run the job using our trigger + try { + scheduler.scheduleJob(job, trigger); + } catch (SchedulerException e) { + LOG.error("Error scheduling entity {}", trigger.getKey()); + throw new NotificationServiceException(e); + } + } + + // Maps the timeunit in entity specification to the one in Quartz DateBuilder + private DateBuilder.IntervalUnit getTimeUnit(Frequency.TimeUnit timeUnit) { + switch (timeUnit) { + case minutes: + return DateBuilder.IntervalUnit.MINUTE; + case hours: + return DateBuilder.IntervalUnit.HOUR; + case days: + return DateBuilder.IntervalUnit.DAY; + case months: + return DateBuilder.IntervalUnit.MONTH; + default: + throw new IllegalArgumentException("Invalid time unit " + timeUnit.name()); + } + } + + private JobKey getJobKey(String key) { + return new JobKey(key, "Falcon"); + } + + private JobDataMap getJobDataMap(AlarmRequest request) { + JobDataMap jobProps = new JobDataMap(); + jobProps.put("request", request); + + return jobProps; + } + + @Override + public void unregister(NotificationHandler handler, ID listenerID) throws NotificationServiceException { + try { + LOG.info("Removing time notification for handler {} with callbackID {}", handler, listenerID); + scheduler.unscheduleJob(notifications.get(listenerID)); + notifications.remove(listenerID); + } catch (SchedulerException e) { + throw new NotificationServiceException("Unable to deregister " + listenerID, e); + } + } + + @Override + public RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID) { + return new AlarmRequestBuilder(handler, callbackID); + } + + @Override + public String getName() { + return "AlarmService"; + } + + @Override + public void destroy() throws FalconException { + try { + scheduler.shutdown(); + } catch (SchedulerException e) { + LOG.warn("Quartz Scheduler shutdown failed.", e); + } + + } + + // Generates a time elapsed event and invokes onEvent on the handler. + private static void notifyHandler(AlarmRequest request, DateTime instanceTime) throws NotificationServiceException { + TimeElapsedEvent event = new TimeElapsedEvent(request.getCallbackId(), request.getStartTime(), + request.getEndTime(), instanceTime); + try { + LOG.info("Sending notification to {} with nominal time {} ", request.getCallbackId(), + event.getInstanceTime()); + request.getHandler().onEvent(event); + } catch (FalconException e) { + LOG.error("Unable to onEvent " + request.getCallbackId() + " for nominal time, " + instanceTime, e); + throw new NotificationServiceException(e); + } + } + + /** + * The Job that runs when a time trigger happens. + */ + public static class FalconProcessJob implements Job { + public FalconProcessJob() { + } + + @Override + public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { + LOG.debug("Quartz job called at : {}, Next fire time: {}", jobExecutionContext.getFireTime(), + jobExecutionContext.getNextFireTime()); + + AlarmRequest request = (AlarmRequest) jobExecutionContext.getJobDetail() + .getJobDataMap().get("request"); + DateTime instanceTime = new DateTime(jobExecutionContext.getScheduledFireTime(), + DateTimeZone.forTimeZone(request.getTimeZone())); + + try { + notifyHandler(request, instanceTime); + } catch (NotificationServiceException e) { + throw new JobExecutionException(e); + } + } + } + + // Quartz doesn't seem to be able to schedule past events. This job specifically handles that. + private static class CatchupJob implements Runnable { + + private final AlarmRequest request; + private final List<Date> instanceTimes; + + public CatchupJob(AlarmRequest request, List<Date> triggerTimes) { + this.request = request; + this.instanceTimes = triggerTimes; + } + + @Override + public void run() { + if (instanceTimes == null) { + return; + } + // Immediate notification for all past events. + for(Date instanceTime : instanceTimes) { + DateTime nominalDateTime = new DateTime(instanceTime, DateTimeZone.forTimeZone(request.getTimeZone())); + try { + notifyHandler(request, nominalDateTime); + } catch (NotificationServiceException e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * Builder that builds {@link AlarmRequest}. + */ + public static class AlarmRequestBuilder extends RequestBuilder<AlarmRequest> { + + private DateTime startTime; + private DateTime endTime; + private Frequency frequency; + private TimeZone timeZone; + + public AlarmRequestBuilder(NotificationHandler handler, ID callbackID) { + super(handler, callbackID); + } + + /** + * @param start of the timer + * @return This instance + */ + public AlarmRequestBuilder setStartTime(DateTime start) { + this.startTime = start; + return this; + } + + /** + * @param end of the timer + * @return This instance + */ + public AlarmRequestBuilder setEndTime(DateTime end) { + this.endTime = end; + return this; + } + + /** + * @param freq of the timer + * @return This instance + */ + public AlarmRequestBuilder setFrequency(Frequency freq) { + this.frequency = freq; + return this; + } + + /** + * @param timeZone + */ + public void setTimeZone(TimeZone timeZone) { + this.timeZone = timeZone; + } + + @Override + public AlarmRequest build() { + if (callbackId == null || startTime == null || endTime == null || frequency == null) { + throw new IllegalArgumentException("Missing one or more of the mandatory arguments:" + + " callbackId, startTime, endTime, frequency"); + } + return new AlarmRequest(handler, callbackId, startTime, endTime, frequency, timeZone); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java new file mode 100644 index 0000000..7ffb351 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.impl; + +import org.apache.falcon.FalconException; +import org.apache.falcon.exception.NotificationServiceException; +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.notification.service.FalconNotificationService; +import org.apache.falcon.notification.service.request.DataNotificationRequest; +import org.apache.falcon.notification.service.request.NotificationRequest; +import org.apache.falcon.state.ID; +import org.apache.hadoop.fs.Path; + +/** + * This notification service notifies {@link NotificationHandler} when requested data + * becomes available. This class also supports time out, in which case it notifies about the unavailability. + * TODO : Complete/Modify this skeletal class + */ +public class DataAvailabilityService implements FalconNotificationService { + + @Override + public void register(NotificationRequest request) throws NotificationServiceException { + // TODO : Implement this + } + + @Override + public void unregister(NotificationHandler handler, ID listenerID) { + // TODO : Implement this + } + + @Override + public RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID) { + return new DataRequestBuilder(handler, callbackID); + } + + @Override + public String getName() { + return "DataAvailabilityService"; + } + + @Override + public void init() throws FalconException { + // TODO : Implement this + } + + @Override + public void destroy() throws FalconException { + + } + + /** + * Builds {@link DataNotificationRequest}. + */ + public static class DataRequestBuilder extends RequestBuilder<DataNotificationRequest> { + private Path dataLocation; + + public DataRequestBuilder(NotificationHandler handler, ID callbackID) { + super(handler, callbackID); + } + + /** + * @param location + * @return This instance + */ + public DataRequestBuilder setDataLocation(Path location) { + this.dataLocation = location; + return this; + } + + @Override + public DataNotificationRequest build() { + if (callbackId == null || dataLocation == null) { + throw new IllegalArgumentException("Missing one or more of the mandatory arguments:" + + " callbackId, dataLocation"); + } + return new DataNotificationRequest(handler, callbackId, dataLocation); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java new file mode 100644 index 0000000..73a4199 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.impl; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.exception.NotificationServiceException; +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.notification.service.FalconNotificationService; +import org.apache.falcon.notification.service.event.JobCompletedEvent; +import org.apache.falcon.notification.service.request.JobCompletionNotificationRequest; +import org.apache.falcon.notification.service.request.NotificationRequest; +import org.apache.falcon.service.Services; +import org.apache.falcon.state.ID; +import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.falcon.workflow.WorkflowExecutionContext; +import org.apache.falcon.workflow.WorkflowExecutionListener; +import org.apache.falcon.workflow.WorkflowJobEndNotificationService; +import org.apache.falcon.workflow.engine.DAGEngineFactory; +import org.apache.oozie.client.WorkflowJob; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TimeZone; + +/** + * This notification service notifies {@link NotificationHandler} when an external job + * completes. + */ +public class JobCompletionService implements FalconNotificationService, WorkflowExecutionListener { + + private static final Logger LOG = LoggerFactory.getLogger(JobCompletionService.class); + private static DateTimeZone utc = DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC")); + + private List<NotificationHandler> listeners = Collections.synchronizedList(new ArrayList<NotificationHandler>()); + + @Override + public void register(NotificationRequest notifRequest) throws NotificationServiceException { + if (notifRequest == null) { + throw new NotificationServiceException("Request object cannot be null"); + } + listeners.add(notifRequest.getHandler()); + JobCompletionNotificationRequest request = (JobCompletionNotificationRequest) notifRequest; + // Check if the job is already complete. + // If yes, send a notification synchronously. + // If not, we expect that this class will get notified when the job completes + // as this class is a listener to WorkflowJobEndNotificationService. + if (request.getExternalId() != null && request.getCluster() != null) { + try { + Properties props = DAGEngineFactory.getDAGEngine(request.getCluster()) + .getConfiguration(request.getExternalId()); + WorkflowExecutionContext context = createContext(props); + if (context.hasWorkflowFailed()) { + onFailure(context); + } else if (context.hasWorkflowSucceeded()) { + onSuccess(context); + } + } catch (FalconException e) { + throw new NotificationServiceException(e); + } + } + } + + @Override + public void unregister(NotificationHandler handler, ID listenerID) { + listeners.remove(handler); + } + + @Override + public RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID) { + return new JobCompletionRequestBuilder(handler, callbackID); + } + + @Override + public String getName() { + return "JobCompletionService"; + } + + @Override + public void init() throws FalconException { + LOG.debug("Registering to job end notification service"); + Services.get().<WorkflowJobEndNotificationService>getService( + WorkflowJobEndNotificationService.SERVICE_NAME).registerListener(this); + } + + @Override + public void destroy() throws FalconException { + + } + + @Override + public void onSuccess(WorkflowExecutionContext context) throws FalconException { + onEnd(context, WorkflowJob.Status.SUCCEEDED); + } + + @Override + public void onFailure(WorkflowExecutionContext context) throws FalconException { + onEnd(context, WorkflowJob.Status.FAILED); + } + + @Override + public void onStart(WorkflowExecutionContext context) throws FalconException { + // Do nothing + } + + @Override + public void onSuspend(WorkflowExecutionContext context) throws FalconException { + // Do nothing + } + + @Override + public void onWait(WorkflowExecutionContext context) throws FalconException { + // Do nothing + } + + private void onEnd(WorkflowExecutionContext context, WorkflowJob.Status status) throws FalconException { + JobCompletedEvent event = new JobCompletedEvent(constructCallbackID(context), status, getEndTime(context)); + for (NotificationHandler handler : listeners) { + LOG.debug("Notifying {} with event {}", handler, event.getTarget()); + handler.onEvent(event); + } + } + + private DateTime getEndTime(WorkflowExecutionContext context) throws FalconException { + return new DateTime(DAGEngineFactory.getDAGEngine(context.getClusterName()) + .info(context.getWorkflowId()).getEndTime()); + } + + // Constructs the callback ID from the details available in the context. + private ID constructCallbackID(WorkflowExecutionContext context) throws FalconException { + ID id = new ID(EntityType.valueOf(context.getEntityType()), context.getEntityName()); + id.setCluster(context.getClusterName()); + id.setInstanceTime(new DateTime(EntityUtil.parseDateUTC(context.getNominalTimeAsISO8601()), utc)); + return id; + } + + private WorkflowExecutionContext createContext(Properties props) { + // for backwards compatibility, read all args from properties + Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs, String>(); + for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) { + String optionValue = props.getProperty(arg.getName()); + if (StringUtils.isNotEmpty(optionValue)) { + wfProperties.put(arg, optionValue); + } + } + + return WorkflowExecutionContext.create(wfProperties); + } + + /** + * Builds {@link JobCompletionNotificationRequest}. + */ + public static class JobCompletionRequestBuilder extends RequestBuilder<JobCompletionNotificationRequest> { + private String cluster; + private String externalId; + + public JobCompletionRequestBuilder(NotificationHandler handler, ID callbackID) { + super(handler, callbackID); + } + + /** + * @param clusterName + */ + public JobCompletionRequestBuilder setCluster(String clusterName) { + this.cluster = clusterName; + return this; + } + + /** + * @param id - The external job id for which job completion notification is requested. + * @return + */ + public JobCompletionRequestBuilder setExternalId(String id) { + this.externalId = id; + return this; + } + + @Override + public JobCompletionNotificationRequest build() { + return new JobCompletionNotificationRequest(handler, callbackId, cluster, externalId); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java new file mode 100644 index 0000000..848f89c --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.impl; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.exception.NotificationServiceException; +import org.apache.falcon.exception.StateStoreException; +import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.notification.service.FalconNotificationService; +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.notification.service.event.Event; +import org.apache.falcon.notification.service.event.JobScheduledEvent; +import org.apache.falcon.notification.service.request.JobCompletionNotificationRequest; +import org.apache.falcon.notification.service.request.JobScheduleNotificationRequest; +import org.apache.falcon.notification.service.request.NotificationRequest; +import org.apache.falcon.predicate.Predicate; +import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceState; +import org.apache.falcon.state.store.AbstractStateStore; +import org.apache.falcon.state.store.StateStore; +import org.apache.falcon.util.ReflectionUtils; +import org.apache.falcon.util.RuntimeProperties; +import org.apache.falcon.workflow.engine.DAGEngineFactory; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * This notification service notifies {@link NotificationHandler} when an execution + * instance is scheduled on a DAG Engine. + * Current implementation of scheduler handles parallel scheduling of instances, + * dependencies (an instance depending on completion of another) and priority. + */ +public class SchedulerService implements FalconNotificationService, NotificationHandler, + RemovalListener<ID, List<ExecutionInstance>> { + + private static final Logger LOG = LoggerFactory.getLogger(SchedulerService.class); + + public static final String DEFAULT_NUM_OF_SCHEDULER_THREADS = "5"; + public static final String NUM_OF_SCHEDULER_THREADS_PROP = "scheduler.threads.count"; + + // Once scheduling conditions are met, it goes to run queue to be run on DAGEngine, based on priority. + private ThreadPoolExecutor runQueue; + + private static final StateStore STATE_STORE = AbstractStateStore.get(); + + private Cache<ID, Object> instancesToIgnore; + // TODO : limit the no. of awaiting instances per entity + private LoadingCache<ID, List<ExecutionInstance>> awaitedInstances; + + @Override + public void register(NotificationRequest notifRequest) throws NotificationServiceException { + JobScheduleNotificationRequest request = (JobScheduleNotificationRequest) notifRequest; + if (request.getInstance() == null) { + throw new NotificationServiceException("Request must contain an instance."); + } + // When the instance is getting rescheduled for run. As in the case of suspend and resume. + Object obj = instancesToIgnore.getIfPresent(request.getInstance().getId()); + if (obj != null) { + instancesToIgnore.invalidate(request.getInstance().getId()); + } + runQueue.execute(new InstanceRunner(request)); + } + + @Override + public void unregister(NotificationHandler handler, ID listenerID) { + // If ID is that of an entity, do nothing + if (listenerID.getInstanceTime() == null) { + return; + } + // Not efficient to iterate over elements to remove this. Add to ignore list. + instancesToIgnore.put(listenerID, new Object()); + + } + + @Override + public RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID) { + return new JobScheduleRequestBuilder(handler, callbackID); + } + + @Override + public String getName() { + return "JobSchedulerService"; + } + + @Override + public void init() throws FalconException { + int numThreads = Integer.parseInt(RuntimeProperties.get().getProperty(NUM_OF_SCHEDULER_THREADS_PROP, + DEFAULT_NUM_OF_SCHEDULER_THREADS)); + + // Uses a priority queue to ensure instances with higher priority gets run first. + PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new PriorityComparator()); + runQueue = new ThreadPoolExecutor(1, numThreads, 0L, TimeUnit.MILLISECONDS, pq); + + CacheLoader instanceCacheLoader = new CacheLoader<ID, Collection<ExecutionInstance>>() { + @Override + public Collection<ExecutionInstance> load(ID id) throws Exception { + List<InstanceState.STATE> states = new ArrayList<InstanceState.STATE>(); + states.add(InstanceState.STATE.READY); + List<ExecutionInstance> readyInstances = new ArrayList<>(); + // TODO : Limit it to no. of instances that can be run in parallel. + for (InstanceState state : STATE_STORE.getExecutionInstances(id.getEntityID(), states)) { + readyInstances.add(state.getInstance()); + } + return readyInstances; + } + }; + + awaitedInstances = CacheBuilder.newBuilder() + .maximumSize(100) + .concurrencyLevel(1) + .removalListener(this) + .build(instanceCacheLoader); + + instancesToIgnore = CacheBuilder.newBuilder() + .expireAfterWrite(1, TimeUnit.HOURS) + .concurrencyLevel(1) + .build(); + // Interested in all job completion events. + JobCompletionNotificationRequest completionRequest = (JobCompletionNotificationRequest) + NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_COMPLETION) + .createRequestBuilder(this, null).build(); + NotificationServicesRegistry.register(completionRequest); + } + + @Override + public void onRemoval(RemovalNotification<ID, List<ExecutionInstance>> removalNotification) { + // When instances are removed due to size... + // Ensure instances are persisted in state store and add to another list of awaited entities. + if (removalNotification.wasEvicted()) { + for (ExecutionInstance instance : removalNotification.getValue()) { + InstanceState state = new InstanceState(instance); + state.setCurrentState(InstanceState.STATE.READY); + try { + STATE_STORE.updateExecutionInstance(state); + } catch (StateStoreException e) { + throw new RuntimeException("Unable to persist the ready instance " + instance.getId(), e); + } + } + } + } + + @Override + public void onEvent(Event event) throws FalconException { + // Interested only in job completion events. + if (event.getSource() == NotificationServicesRegistry.SERVICE.JOB_COMPLETION) { + try { + // Check if the instance is awaited. + ID id = event.getTarget(); + List<ExecutionInstance> instances = awaitedInstances.get(id); + // Else, check if the entity is awaited. + if (instances == null) { + id = id.getEntityID(); + instances = awaitedInstances.get(id); + } + if (instances != null && !instances.isEmpty()) { + ExecutionInstance instance = instances.get(0); + if (instance != null && instance.getAwaitingPredicates() != null) { + for (Predicate predicate : instance.getAwaitingPredicates()) { + if (predicate.getType() == Predicate.TYPE.JOB_COMPLETION) { + // Construct a request object + NotificationHandler handler = ReflectionUtils + .getInstanceByClassName(predicate.getClauseValue("handler").toString()); + JobScheduleRequestBuilder requestBuilder = new JobScheduleRequestBuilder( + handler, instance.getId()); + requestBuilder.setInstance(instance); + InstanceRunner runner = new InstanceRunner(requestBuilder.build()); + // Since an instance just finished of the same entity just finished + if (id.equals(instance.getId())) { + runner.incrementAllowedInstances(); + } + runQueue.execute(runner); + instances.remove(instance); + } + } + } + } + if (instances != null && instances.isEmpty()) { + awaitedInstances.invalidate(id); + } + } catch (Exception e) { + throw new FalconException(e); + } + } + } + + @Override + public void destroy() throws FalconException { + runQueue.shutdownNow(); + instancesToIgnore.invalidateAll(); + } + + private void notifyFailureEvent(JobScheduleNotificationRequest request) throws FalconException { + JobScheduledEvent event = new JobScheduledEvent(request.getCallbackId(), JobScheduledEvent.STATUS.FAILED); + request.getHandler().onEvent(event); + } + + private class InstanceRunner implements Runnable { + private final ExecutionInstance instance; + private final JobScheduleNotificationRequest request; + private short priority; + private int allowedParallelInstances = 1; + + public InstanceRunner(JobScheduleNotificationRequest request) { + this.request = request; + this.instance = request.getInstance(); + this.priority = getPriority(instance.getEntity()).getPriority(); + allowedParallelInstances = EntityUtil.getParallel(instance.getEntity()); + } + + public int incrementAllowedInstances() { + return ++allowedParallelInstances; + } + + private EntityUtil.JOBPRIORITY getPriority(Entity entity) { + switch(entity.getEntityType()) { + case PROCESS : + return EntityUtil.getPriority((Process)entity); + default : + throw new UnsupportedOperationException("Scheduling of entities other " + + "than process is not supported yet."); + } + } + + @Override + public void run() { + try { + // If de-registered + if (instancesToIgnore.getIfPresent(instance.getId()) != null) { + LOG.debug("Instance {} has been deregistered. Ignoring.", instance.getId()); + instancesToIgnore.invalidate(instance.getId()); + return; + } + LOG.debug("Received request to run instance {}", instance.getId()); + if (checkConditions()) { + // If instance not already scheduled. + String externalId = instance.getExternalID(); + if (externalId == null) { + externalId = DAGEngineFactory.getDAGEngine(instance.getCluster()).run(instance); + LOG.info("Scheduled job {} for instance {}", externalId, instance.getId()); + } + JobScheduledEvent event = new JobScheduledEvent(instance.getId(), + JobScheduledEvent.STATUS.SUCCESSFUL); + event.setExternalID(externalId); + event.setStartTime(new DateTime(DAGEngineFactory.getDAGEngine(instance.getCluster()) + .info(externalId).getStartTime())); + request.getHandler().onEvent(event); + } + } catch (FalconException e) { + LOG.error("Error running the instance : " + instance.getId(), e); + try { + notifyFailureEvent(request); + } catch (FalconException fe) { + throw new RuntimeException("Unable to onEvent : " + request.getCallbackId(), fe); + } + } + } + + public short getPriority() { + return priority; + } + + private boolean checkConditions() throws FalconException { + try { + // TODO : If and when the no. of scheduling conditions increase, consider chaining condition checks. + // Run if all conditions are met. + if (instanceCheck() && dependencyCheck()) { + return true; + } else { + ID entityID = instance.getId().getEntityID(); + // Instance is awaiting scheduling conditions to be met. Add predicate to that effect. + instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(request.getHandler(), + entityID)); + updateAwaitedInstances(entityID); + LOG.debug("Schedule conditions not met for instance {}. Awaiting on {}", + instance.getId(), entityID); + } + } catch (Exception e) { + LOG.error("Instance run failed with error : ", e); + throw new FalconException("Instance run failed", e); + } + return false; + } + + private void updateAwaitedInstances(ID id) throws ExecutionException { + synchronized (id) { + List<ExecutionInstance> instances = awaitedInstances.get(id); + if (instances == null) { + // Order is FIFO. + instances = new LinkedList<>(); + awaitedInstances.put(id, instances); + } + instances.add(instance); + } + } + + private boolean dependencyCheck() throws FalconException, ExecutionException { + if (request.getDependencies() == null || request.getDependencies().isEmpty()) { + return true; + } + + for (ExecutionInstance execInstance : request.getDependencies()) { + // Dependants should wait for this instance to complete. Add predicate to that effect. + instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate( + request.getHandler(), execInstance.getId())); + updateAwaitedInstances(execInstance.getId()); + } + return false; + } + + // Ensure no. of instances running in parallel is per entity specification. + private boolean instanceCheck() throws StateStoreException { + return STATE_STORE.getExecutionInstances(instance.getEntity(), instance.getCluster(), + InstanceState.getRunningStates()).size() < allowedParallelInstances; + } + } + + // A priority based comparator to be used by the {@link java.util.concurrent.PriorityBlockingQueue} + private static class PriorityComparator<T extends InstanceRunner> implements Comparator<T>, Serializable { + @Override + public int compare(T o1, T o2) { + return o1.getPriority() - o2.getPriority(); + } + } + + /** + * Builds {@link JobScheduleNotificationRequest}. + */ + public static class JobScheduleRequestBuilder extends RequestBuilder<JobScheduleNotificationRequest> { + private List<ExecutionInstance> dependencies; + private ExecutionInstance instance; + + public JobScheduleRequestBuilder(NotificationHandler handler, ID callbackID) { + super(handler, callbackID); + } + + /** + * @param execInstance that needs to be scheduled + * @return + */ + public JobScheduleRequestBuilder setInstance(ExecutionInstance execInstance) { + this.instance = execInstance; + return this; + } + + /** + * Dependencies to wait for before scheduling. + * @param dependencies + */ + public void setDependencies(List<ExecutionInstance> dependencies) { + this.dependencies = dependencies; + } + + @Override + public JobScheduleNotificationRequest build() { + if (callbackId == null || instance == null) { + throw new IllegalArgumentException("Missing one or more of the mandatory arguments:" + + " callbackId, execInstance"); + } + return new JobScheduleNotificationRequest(handler, callbackId, instance, dependencies); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java new file mode 100644 index 0000000..2628dc8 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.request; + +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.state.ID; +import org.joda.time.DateTime; + +import java.util.TimeZone; + +/** + * Request intended for {@link org.apache.falcon.notification.service.impl.AlarmService} + * for time based notifications. + * The setter methods of the class support chaining similar to a builder class. + * TODO : Might need a separate builder too. + */ +public class AlarmRequest extends NotificationRequest { + + private DateTime startTime; + private DateTime endTime; + private Frequency frequency; + private TimeZone timeZone; + + /** + * Constructor. + * @param notifHandler + * @param callbackId + */ + public AlarmRequest(NotificationHandler notifHandler, ID callbackId, DateTime start, + DateTime end, Frequency freq, TimeZone tz) { + this.handler = notifHandler; + this.callbackId = callbackId; + this.service = NotificationServicesRegistry.SERVICE.TIME; + this.startTime = start; + this.endTime = end; + this.frequency = freq; + this.timeZone = tz; + } + + /** + * @return frequency of the timer + */ + public Frequency getFrequency() { + return frequency; + } + + /** + * @return start time of the timer + */ + public DateTime getStartTime() { + return startTime; + } + + /** + * @return end time of the timer + */ + public DateTime getEndTime() { + return endTime; + } + + /** + * @return timezone of the request. + */ + public TimeZone getTimeZone() { + return timeZone; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java new file mode 100644 index 0000000..8393de0 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.request; + +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.state.ID; +import org.apache.hadoop.fs.Path; + +/** + * Request intended for {@link import org.apache.falcon.notification.service.impl.DataAvailabilityService} + * for data notifications. + * The setter methods of the class support chaining similar to a builder class. + * TODO : Complete/modify this skeletal class + */ +public class DataNotificationRequest extends NotificationRequest { + private final Path dataLocation; + private String cluster; + + /** + * @return data location to be watched. + */ + public Path getDataLocation() { + return dataLocation; + } + + /** + * Given a number of instances, should the service wait for exactly those many, + * at least those many or at most those many instances. + */ + public enum INSTANCELIMIT { + EXACTLY_N, + AT_LEAST_N, + AT_MOST_N + } + + /** + * Constructor. + * @param notifHandler + * @param callbackId + */ + public DataNotificationRequest(NotificationHandler notifHandler, ID callbackId, Path location) { + this.handler = notifHandler; + this.callbackId = callbackId; + this.dataLocation = location; + this.service = NotificationServicesRegistry.SERVICE.DATA; + } + + /** + * @return cluster name + */ + public String getCluster() { + return cluster; + } + + /** + * @param clusterName + * @return This instance + */ + public DataNotificationRequest setCluster(String clusterName) { + this.cluster = clusterName; + return this; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java new file mode 100644 index 0000000..1d35476 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.request; + +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.state.ID; + +/** + * Request intended for {@link org.apache.falcon.notification.service.impl.JobCompletionService} + * for job completion notifications. + * The setter methods of the class support chaining similar to a builder class. + */ +public class JobCompletionNotificationRequest extends NotificationRequest { + + private String externalId; + private String cluster; + /** + * Constructor. + * @param notifHandler + * @param callbackId + */ + public JobCompletionNotificationRequest(NotificationHandler notifHandler, ID callbackId, String clstr, + String jobId) { + this.handler = notifHandler; + this.service = NotificationServicesRegistry.SERVICE.JOB_COMPLETION; + this.callbackId = callbackId; + this.cluster = clstr; + this.externalId = jobId; + } + + /** + * @return - The external job id for which job completion notification is requested. + */ + public String getExternalId() { + return externalId; + } + + /** + * @return cluster name + */ + public String getCluster() { + return cluster; + } + + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java new file mode 100644 index 0000000..80133bd --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.request; + +import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.state.ID; + +import java.util.List; + +/** + * Request intended for {@link org.apache.falcon.notification.service.impl.SchedulerService} + * for job run notifications. + * The setter methods of the class support chaining similar to a builder class. + */ +public class JobScheduleNotificationRequest extends NotificationRequest { + private ExecutionInstance instance; + private List<ExecutionInstance> dependencies; + + /** + * Constructor. + * @param notifHandler + * @param id + */ + public JobScheduleNotificationRequest(NotificationHandler notifHandler, ID id, ExecutionInstance inst, + List<ExecutionInstance> deps) { + this.handler = notifHandler; + this.service = NotificationServicesRegistry.SERVICE.JOB_SCHEDULE; + this.callbackId = id; + this.instance = inst; + this.dependencies = deps; + } + + /** + * @return execution instance that will be scheduled. + */ + public ExecutionInstance getInstance() { + return instance; + } + + public List<ExecutionInstance> getDependencies() { + return dependencies; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java new file mode 100644 index 0000000..c89668d --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service.request; + +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.state.ID; + +/** + * An abstract class that all notification requests of services must extend. + * TODO : Complete/modify this skeleton class + */ +public abstract class NotificationRequest { + protected NotificationHandler handler; + protected ID callbackId; + protected NotificationServicesRegistry.SERVICE service; + + /** + * @return - The service that this request is intended for + */ + public NotificationServicesRegistry.SERVICE getService() { + return service; + } + + /** + * @return - The entity that needs to be notified when this request is satisfied. + */ + public ID getCallbackId() { + return callbackId; + } + + /** + * @return - The notification handler. + */ + public NotificationHandler getHandler() { + return handler; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java new file mode 100644 index 0000000..fb4ce82 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.predicate; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.feed.Location; +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.notification.service.event.DataEvent; +import org.apache.falcon.notification.service.event.Event; +import org.apache.falcon.notification.service.event.TimeElapsedEvent; +import org.apache.falcon.state.ID; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * Represents the gating condition for which an instance is waiting before it is scheduled. + * This will be serialized and stored in state store. + */ +public class Predicate implements Serializable { + /** + * Type of predicate, currently data and time are supported. + */ + public enum TYPE { + DATA, + TIME, + JOB_COMPLETION + } + + private final TYPE type; + + // A key-value pair of clauses that need make this predicate. + private Map<String, Comparable> clauses = new HashMap<String, Comparable>(); + + // A generic "any" object that can be used when a particular key is allowed to have any value. + public static final Comparable<? extends Serializable> ANY = new Any(); + + /** + * @return type of predicate + */ + public TYPE getType() { + return type; + } + + /** + * @param key + * @return the value corresponding to the key + */ + public Comparable getClauseValue(String key) { + return clauses.get(key); + } + + /** + * Compares this predicate with the supplied predicate. + * + * @param suppliedPredicate + * @return true, if the clauses of the predicates match. false, otherwise. + */ + public boolean evaluate(Predicate suppliedPredicate) { + if (type != suppliedPredicate.getType()) { + return false; + } + boolean eval = true; + // Iterate over each clause and ensure it matches the clauses of this predicate. + for (Map.Entry<String, Comparable> entry : suppliedPredicate.getClauses().entrySet()) { + eval = eval && matches(entry.getKey(), entry.getValue()); + if (!eval) { + return false; + } + } + return true; + } + + // Compares the two values of a key. + private boolean matches(String lhs, Comparable<? extends Serializable> rhs) { + if (clauses.containsKey(lhs) && clauses.get(lhs) != null + && rhs != null) { + if (clauses.get(lhs).equals(ANY) || rhs.equals(ANY)) { + return true; + } else { + return clauses.get(lhs).compareTo(rhs) == 0; + } + } + return false; + } + + /** + * @param type of predicate + */ + public Predicate(TYPE type) { + this.type = type; + } + + /** + * @return the name-value pairs that make up the clauses of this predicate. + */ + public Map<String, Comparable> getClauses() { + return clauses; + } + + /** + * @param lhs - The key in the key-value pair of a clause + * @param rhs - The value in the key-value pair of a clause + * @return This instance + */ + public Predicate addClause(String lhs, Comparable<? extends Serializable> rhs) { + clauses.put(lhs, rhs); + return this; + } + + /** + * Creates a Predicate of Type TIME. + * + * @param start + * @param end + * @param instanceTime + * @return + */ + public static Predicate createTimePredicate(long start, long end, long instanceTime) { + return new Predicate(TYPE.TIME) + .addClause("start", (start < 0) ? ANY : start) + .addClause("end", (end < 0) ? ANY : end) + .addClause("instanceTime", (instanceTime < 0) ? ANY : instanceTime); + } + + /** + * Creates a predicate of type DATA. + * + * @param location + * @return + */ + public static Predicate createDataPredicate(Location location) { + return new Predicate(TYPE.DATA) + .addClause("path", (location == null) ? ANY : location.getPath()) + .addClause("type", (location == null) ? ANY : location.getType()); + } + + /** + * Creates a predicate of type JOB_COMPLETION. + * + * @param handler + * @param id + * @return + */ + public static Predicate createJobCompletionPredicate(NotificationHandler handler, ID id) { + return new Predicate(TYPE.JOB_COMPLETION) + .addClause("instanceId", id.toString()) + .addClause("handler", handler.getClass().getName()); + } + + /** + * Creates a predicate from an event based on the event source and values in the event. + * + * @param event + * @return + * @throws FalconException + */ + public static Predicate getPredicate(Event event) throws FalconException { + if (event.getSource() == NotificationServicesRegistry.SERVICE.DATA) { + DataEvent dataEvent = (DataEvent) event; + if (dataEvent.getDataLocation() != null && dataEvent.getDataType() != null) { + Location loc = new Location(); + loc.setPath(dataEvent.getDataLocation().toString()); + loc.setType(dataEvent.getDataType()); + return createDataPredicate(loc); + } else { + throw new FalconException("Event does not have enough data to create a predicate"); + } + } else if (event.getSource() == NotificationServicesRegistry.SERVICE.TIME) { + TimeElapsedEvent timeEvent = (TimeElapsedEvent) event; + if (timeEvent.getStartTime() != null && timeEvent.getEndTime() != null) { + long instanceTime = (timeEvent.getInstanceTime() == null)? -1 : timeEvent.getInstanceTime().getMillis(); + return Predicate.createTimePredicate(timeEvent.getStartTime().getMillis(), + timeEvent.getEndTime().getMillis(), instanceTime); + } else { + throw new FalconException("Event does not have enough data to create a predicate"); + } + + } else { + throw new FalconException("Unhandled event type " + event.getSource()); + } + } + + /** + * An "Any" class that returns '0' when compared to any other object. + */ + private static class Any implements Comparable, Serializable { + @Override + public int compareTo(Object o) { + return 0; + } + + @Override + public boolean equals(Object o) { + return super.equals(o); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java new file mode 100644 index 0000000..15aea9a --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state; + +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.exception.InvalidStateTransitionException; + +/** + * Represents the state of a schedulable entity. + * Implements {@link org.apache.falcon.state.StateMachine} for an entity. + */ +public class EntityState implements StateMachine<EntityState.STATE, EntityState.EVENT> { + private Entity entity; + private STATE currentState; + private static final STATE INITIAL_STATE = STATE.SUBMITTED; + + /** + * Enumerates all the valid states of a schedulable entity and the valid transitions from that state. + */ + public enum STATE implements StateMachine<EntityState.STATE, EntityState.EVENT> { + SUBMITTED { + @Override + public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { + switch (event) { + case SCHEDULE: + return STATE.SCHEDULED; + case SUBMIT: + return this; + default: + throw new InvalidStateTransitionException("Submitted entities can only be scheduled."); + } + } + }, + SCHEDULED { + @Override + public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { + switch (event) { + case SUSPEND: + return STATE.SUSPENDED; + case SCHEDULE: + return this; + default: + throw new InvalidStateTransitionException("Scheduled entities can only be suspended."); + } + } + }, + SUSPENDED { + @Override + public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { + switch (event) { + case RESUME: + return STATE.SCHEDULED; + case SUSPEND: + return this; + default: + throw new InvalidStateTransitionException("Suspended entities can only be resumed."); + } + } + } + } + + /** + * Enumerates all the valid events that can cause a state transition. + */ + public enum EVENT { + SUBMIT, + SCHEDULE, + SUSPEND, + RESUME + } + + /** + * Constructor. + * + * @param e - Entity + */ + public EntityState(Entity e) { + this.entity = e; + currentState = INITIAL_STATE; + } + + /** + * @return - The entity + */ + public Entity getEntity() { + return entity; + } + + /** + * @param e - entity + * @return - This instance + */ + public EntityState setEntity(Entity e) { + this.entity = e; + return this; + } + + /** + * @return - Current state of the entity. + */ + public STATE getCurrentState() { + return currentState; + } + + /** + * @param state + * @return - This instance + */ + public EntityState setCurrentState(STATE state) { + this.currentState = state; + return this; + } + + @Override + public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { + return currentState.nextTransition(event); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java b/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java new file mode 100644 index 0000000..44ec3fc --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.Entity; + +/** + * Any handler interested in handling state changes of entities must implement this interface. + */ +public interface EntityStateChangeHandler { + + /** + * Invoked when an entity is submitted. + * + * @param entity + * @throws FalconException + */ + void onSubmit(Entity entity) throws FalconException; + + /** + * Invoked when an entity is scheduled. + * + * @param entity + * @throws FalconException + */ + void onSchedule(Entity entity) throws FalconException; + + /** + * Invoked when an entity is suspended. + * + * @param entity + * @throws FalconException + */ + void onSuspend(Entity entity) throws FalconException; + + /** + * Invoked when the an intity is resumed. + * + * @param entity + * @throws FalconException + */ + void onResume(Entity entity) throws FalconException; +}
