http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
new file mode 100644
index 0000000..4883fe7
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * An event generated by {@link 
org.apache.falcon.notification.service.impl.DataAvailabilityService}
+ * indicating availability or non-availability of a dataset.
+ */
+public class DataEvent implements Event {
+    private final ID callbackID;
+    private Path dataLocation;
+    private LocationType dataType;
+    private STATUS status;
+
+    /**
+     * Enumerates the status of data.
+     */
+    public enum STATUS {
+        AVAILABLE,
+        UNAVAILABLE
+    }
+
+    public DataEvent(ID callbackID, Path location, LocationType locType, 
STATUS availability) {
+        this.callbackID = callbackID;
+        this.dataLocation = location;
+        this.dataType = locType;
+        this.status = availability;
+    }
+
+    public STATUS getStatus() {
+        return status;
+    }
+
+    public void setStatus(STATUS availability) {
+        this.status = availability;
+    }
+
+    public Path getDataLocation() {
+        return dataLocation;
+    }
+
+    public LocationType getDataType() {
+        return dataType;
+    }
+
+    @Override
+    public NotificationServicesRegistry.SERVICE getSource() {
+        return NotificationServicesRegistry.SERVICE.DATA;
+    }
+
+    @Override
+    public ID getTarget() {
+        return callbackID;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
new file mode 100644
index 0000000..140973b
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+
+/**
+ * An events that are generated by notification services must implement this 
interface.
+ */
+public interface Event {
+
+    /**
+     * @return The service that generated this event
+     */
+    NotificationServicesRegistry.SERVICE getSource();
+
+    /**
+     * @return ID of the notification handler for which this event was meant 
for.
+     */
+    ID getTarget();
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
new file mode 100644
index 0000000..c587343
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+
+/**
+ * An event generated by {@link 
org.apache.falcon.notification.service.impl.JobCompletionService}
+ * indicating completion of a Job.
+ */
+public class JobCompletedEvent implements Event {
+
+    private WorkflowJob.Status status;
+    private final ID callbackID;
+    private DateTime endTime;
+
+    public JobCompletedEvent(ID callbackID, WorkflowJob.Status jobStatus, 
DateTime end) {
+        this.callbackID = callbackID;
+        this.status = jobStatus;
+        this.endTime = end;
+    }
+
+    public WorkflowJob.Status getStatus() {
+        return status;
+    }
+
+    @Override
+    public NotificationServicesRegistry.SERVICE getSource() {
+        return NotificationServicesRegistry.SERVICE.JOB_COMPLETION;
+    }
+
+    @Override
+    public ID getTarget() {
+        return callbackID;
+    }
+
+    public DateTime getEndTime() {
+        return endTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
new file mode 100644
index 0000000..55023e7
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+
+/**
+ * An event generated by {@link 
org.apache.falcon.notification.service.impl.SchedulerService}
+ * indicating if an instance was scheduled for execution.
+ */
+public class JobScheduledEvent implements Event {
+    private final ID callbackID;
+    private String externalID;
+    private STATUS status;
+    private DateTime startTime;
+
+    public JobScheduledEvent(ID callbackID, STATUS status) {
+        this.callbackID = callbackID;
+        this.status = status;
+    }
+
+    public String getExternalID() {
+        return externalID;
+    }
+
+    public void setExternalID(String externalID) {
+        this.externalID = externalID;
+    }
+
+    @Override
+    public NotificationServicesRegistry.SERVICE getSource() {
+        return NotificationServicesRegistry.SERVICE.JOB_SCHEDULE;
+    }
+
+    @Override
+    public ID getTarget() {
+        return callbackID;
+    }
+
+    /**
+     * @return - The status of the scheduled DAG/Job.
+     */
+    public STATUS getStatus() {
+        return status;
+    }
+
+
+    public DateTime getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(DateTime startTime) {
+        this.startTime = startTime;
+    }
+
+    /**
+     * Enumeration of possible statuses of a DAG/Job.
+     */
+    public enum STATUS {
+        FAILED,
+        SUCCESSFUL
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
new file mode 100644
index 0000000..7ec4de6
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+
+/**
+ * An event generated by {@link 
org.apache.falcon.notification.service.impl.AlarmService}
+ * indicating that a given time duration has elapsed.
+ */
+public class TimeElapsedEvent implements Event {
+    private DateTime startTime;
+    private DateTime endTime;
+    private DateTime instanceTime;
+    private final ID callbackID;
+
+    public DateTime getInstanceTime() {
+        return instanceTime;
+    }
+
+    public DateTime getStartTime() {
+        return startTime;
+    }
+
+    public DateTime getEndTime() {
+        return endTime;
+    }
+
+    public TimeElapsedEvent(ID callbackID, DateTime start, DateTime end, 
DateTime instTime) {
+        this.callbackID = callbackID;
+        this.startTime = start;
+        this.endTime = end;
+        this.instanceTime = instTime;
+    }
+
+    @Override
+    public NotificationServicesRegistry.SERVICE getSource() {
+        return NotificationServicesRegistry.SERVICE.TIME;
+    }
+
+    @Override
+    public ID getTarget() {
+        return callbackID;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java
new file mode 100644
index 0000000..cccdeac
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.impl;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.execution.SchedulerUtil;
+import org.apache.falcon.notification.service.FalconNotificationService;
+import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.notification.service.request.AlarmRequest;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.quartz.CalendarIntervalTrigger;
+import org.quartz.DateBuilder;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerKey;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.quartz.CalendarIntervalScheduleBuilder.calendarIntervalSchedule;
+import static org.quartz.JobBuilder.newJob;
+import static org.quartz.TriggerBuilder.newTrigger;
+
+/**
+ * This notification service notifies {@link NotificationHandler} when 
requested time
+ * event has occurred. The class users to subscribe to frequency based, cron 
based or some calendar based time events.
+ */
+public class AlarmService implements FalconNotificationService {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AlarmService.class);
+
+    private Map<ID, TriggerKey> notifications = new HashMap<ID, TriggerKey>();
+    private static ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(10);
+    private Scheduler scheduler;
+
+    @Override
+    public void init() throws FalconException {
+        try {
+            scheduler = StdSchedulerFactory.getDefaultScheduler();
+            scheduler.start();
+        } catch (SchedulerException e) {
+            throw new FalconException(e);
+        }
+    }
+
+    @Override
+    public void register(NotificationRequest notificationRequest) throws 
NotificationServiceException {
+        LOG.info("Registering alarm notification for " + 
notificationRequest.getCallbackId());
+        AlarmRequest request = (AlarmRequest) notificationRequest;
+        DateTime currentTime = DateTime.now();
+        DateTime nextStartTime = request.getStartTime();
+        DateTime endTime;
+        if (request.getEndTime().isBefore(currentTime)) {
+            endTime = request.getEndTime();
+        } else {
+            endTime = currentTime;
+        }
+        // Handle past events.
+        // TODO : Quartz doesn't seem to support running jobs for past events.
+        // TODO : Remove the handling of past events when that support is 
added.
+        if (request.getStartTime().isBefore(currentTime)) {
+
+            List<Date> instanceTimes = 
EntityUtil.getInstanceTimes(request.getStartTime().toDate(),
+                    request.getFrequency(), request.getTimeZone(), 
request.getStartTime().toDate(),
+                    endTime.toDate());
+            if (instanceTimes != null && !instanceTimes.isEmpty()) {
+                Date lastInstanceTime = instanceTimes.get(instanceTimes.size() 
- 1);
+                nextStartTime = new DateTime(lastInstanceTime.getTime()
+                        + SchedulerUtil.getFrequencyInMillis(new 
DateTime(lastInstanceTime), request.getFrequency()));
+                // Introduce some delay to allow for rest of the registration 
to complete.
+                LOG.debug("Triggering events for past from {} till {}", 
instanceTimes.get(0), lastInstanceTime);
+                executor.schedule(new CatchupJob(request, instanceTimes), 1, 
TimeUnit.SECONDS);
+            }
+        }
+        // All past events have been scheduled. Nothing to schedule in the 
future.
+        if (request.getEndTime().isBefore(currentTime)) {
+            return;
+        }
+        LOG.debug("Scheduling to trigger events from {} to {} with frequency 
{}", nextStartTime, request.getEndTime(),
+                request.getFrequency());
+        // Schedule future events using Quartz
+        CalendarIntervalTrigger trigger = newTrigger()
+                .withIdentity(notificationRequest.getCallbackId().toString(), 
"Falcon")
+                .startAt(nextStartTime.toDate())
+                .endAt(request.getEndTime().toDate())
+                .withSchedule(
+                        calendarIntervalSchedule()
+                                
.withInterval(request.getFrequency().getFrequencyAsInt(),
+                                       
getTimeUnit(request.getFrequency().getTimeUnit()))
+                                
.withMisfireHandlingInstructionFireAndProceed())
+                .build();
+
+        // define the job and tie it to our Job class
+        JobDetail job = newJob(FalconProcessJob.class)
+                
.withIdentity(getJobKey(notificationRequest.getCallbackId().toString()))
+                .setJobData(getJobDataMap((AlarmRequest) notificationRequest))
+                .build();
+        notifications.put(notificationRequest.getCallbackId(), 
trigger.getKey());
+        // Tell quartz to run the job using our trigger
+        try {
+            scheduler.scheduleJob(job, trigger);
+        } catch (SchedulerException e) {
+            LOG.error("Error scheduling entity {}", trigger.getKey());
+            throw new NotificationServiceException(e);
+        }
+    }
+
+    // Maps the timeunit in entity specification to the one in Quartz 
DateBuilder
+    private DateBuilder.IntervalUnit getTimeUnit(Frequency.TimeUnit timeUnit) {
+        switch (timeUnit) {
+        case minutes:
+            return DateBuilder.IntervalUnit.MINUTE;
+        case hours:
+            return DateBuilder.IntervalUnit.HOUR;
+        case days:
+            return DateBuilder.IntervalUnit.DAY;
+        case months:
+            return DateBuilder.IntervalUnit.MONTH;
+        default:
+            throw new IllegalArgumentException("Invalid time unit " + 
timeUnit.name());
+        }
+    }
+
+    private JobKey getJobKey(String key) {
+        return new JobKey(key, "Falcon");
+    }
+
+    private JobDataMap getJobDataMap(AlarmRequest request) {
+        JobDataMap jobProps = new JobDataMap();
+        jobProps.put("request", request);
+
+        return jobProps;
+    }
+
+    @Override
+    public void unregister(NotificationHandler handler, ID listenerID) throws 
NotificationServiceException {
+        try {
+            LOG.info("Removing time notification for handler {} with 
callbackID {}", handler, listenerID);
+            scheduler.unscheduleJob(notifications.get(listenerID));
+            notifications.remove(listenerID);
+        } catch (SchedulerException e) {
+            throw new NotificationServiceException("Unable to deregister " + 
listenerID, e);
+        }
+    }
+
+    @Override
+    public RequestBuilder createRequestBuilder(NotificationHandler handler, ID 
callbackID) {
+        return new AlarmRequestBuilder(handler, callbackID);
+    }
+
+    @Override
+    public String getName() {
+        return "AlarmService";
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        try {
+            scheduler.shutdown();
+        } catch (SchedulerException e) {
+            LOG.warn("Quartz Scheduler shutdown failed.", e);
+        }
+
+    }
+
+    // Generates a time elapsed event and invokes onEvent on the handler.
+    private static void notifyHandler(AlarmRequest request, DateTime 
instanceTime) throws NotificationServiceException {
+        TimeElapsedEvent event = new TimeElapsedEvent(request.getCallbackId(), 
request.getStartTime(),
+                request.getEndTime(), instanceTime);
+        try {
+            LOG.info("Sending notification to {} with nominal time {} ", 
request.getCallbackId(),
+                    event.getInstanceTime());
+            request.getHandler().onEvent(event);
+        } catch (FalconException e) {
+            LOG.error("Unable to onEvent " + request.getCallbackId() + " for 
nominal time, " + instanceTime, e);
+            throw new NotificationServiceException(e);
+        }
+    }
+
+    /**
+     * The Job that runs when a time trigger happens.
+     */
+    public static class FalconProcessJob implements Job {
+        public FalconProcessJob() {
+        }
+
+        @Override
+        public void execute(JobExecutionContext jobExecutionContext) throws 
JobExecutionException {
+            LOG.debug("Quartz job called at : {}, Next fire time: {}", 
jobExecutionContext.getFireTime(),
+                    jobExecutionContext.getNextFireTime());
+
+            AlarmRequest request = (AlarmRequest) 
jobExecutionContext.getJobDetail()
+                    .getJobDataMap().get("request");
+            DateTime instanceTime = new 
DateTime(jobExecutionContext.getScheduledFireTime(),
+                    DateTimeZone.forTimeZone(request.getTimeZone()));
+
+            try {
+                notifyHandler(request, instanceTime);
+            } catch (NotificationServiceException e) {
+                throw new JobExecutionException(e);
+            }
+        }
+    }
+
+    // Quartz doesn't seem to be able to schedule past events. This job 
specifically handles that.
+    private static class CatchupJob implements Runnable {
+
+        private final AlarmRequest request;
+        private final List<Date> instanceTimes;
+
+        public CatchupJob(AlarmRequest request, List<Date> triggerTimes) {
+            this.request = request;
+            this.instanceTimes = triggerTimes;
+        }
+
+        @Override
+        public void run() {
+            if (instanceTimes == null) {
+                return;
+            }
+            // Immediate notification for all past events.
+            for(Date instanceTime : instanceTimes) {
+                DateTime nominalDateTime = new DateTime(instanceTime, 
DateTimeZone.forTimeZone(request.getTimeZone()));
+                try {
+                    notifyHandler(request, nominalDateTime);
+                } catch (NotificationServiceException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Builder that builds {@link AlarmRequest}.
+     */
+    public static class AlarmRequestBuilder extends 
RequestBuilder<AlarmRequest> {
+
+        private DateTime startTime;
+        private DateTime endTime;
+        private Frequency frequency;
+        private TimeZone timeZone;
+
+        public AlarmRequestBuilder(NotificationHandler handler, ID callbackID) 
{
+            super(handler, callbackID);
+        }
+
+        /**
+         * @param start of the timer
+         * @return This instance
+         */
+        public AlarmRequestBuilder setStartTime(DateTime start) {
+            this.startTime = start;
+            return this;
+        }
+
+        /**
+         * @param end of the timer
+         * @return This instance
+         */
+        public AlarmRequestBuilder setEndTime(DateTime end) {
+            this.endTime = end;
+            return this;
+        }
+
+        /**
+         * @param freq of the timer
+         * @return This instance
+         */
+        public AlarmRequestBuilder setFrequency(Frequency freq) {
+            this.frequency = freq;
+            return this;
+        }
+
+        /**
+         * @param timeZone
+         */
+        public void setTimeZone(TimeZone timeZone) {
+            this.timeZone = timeZone;
+        }
+
+        @Override
+        public AlarmRequest build() {
+            if (callbackId == null || startTime == null || endTime == null || 
frequency == null) {
+                throw new IllegalArgumentException("Missing one or more of the 
mandatory arguments:"
+                        + " callbackId, startTime, endTime, frequency");
+            }
+            return new AlarmRequest(handler, callbackId, startTime, endTime, 
frequency, timeZone);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
new file mode 100644
index 0000000..7ffb351
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.impl;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.FalconNotificationService;
+import org.apache.falcon.notification.service.request.DataNotificationRequest;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.state.ID;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This notification service notifies {@link NotificationHandler} when 
requested data
+ * becomes available. This class also supports time out, in which case it 
notifies about the unavailability.
+ * TODO : Complete/Modify this skeletal class
+ */
+public class DataAvailabilityService implements FalconNotificationService {
+
+    @Override
+    public void register(NotificationRequest request) throws 
NotificationServiceException {
+        // TODO : Implement this
+    }
+
+    @Override
+    public void unregister(NotificationHandler handler, ID listenerID) {
+        // TODO : Implement this
+    }
+
+    @Override
+    public RequestBuilder createRequestBuilder(NotificationHandler handler, ID 
callbackID) {
+        return new DataRequestBuilder(handler, callbackID);
+    }
+
+    @Override
+    public String getName() {
+        return "DataAvailabilityService";
+    }
+
+    @Override
+    public void init() throws FalconException {
+        // TODO : Implement this
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+
+    }
+
+    /**
+     * Builds {@link DataNotificationRequest}.
+     */
+    public static class DataRequestBuilder extends 
RequestBuilder<DataNotificationRequest> {
+        private Path dataLocation;
+
+        public DataRequestBuilder(NotificationHandler handler, ID callbackID) {
+            super(handler, callbackID);
+        }
+
+        /**
+         * @param location
+         * @return This instance
+         */
+        public DataRequestBuilder setDataLocation(Path location) {
+            this.dataLocation = location;
+            return this;
+        }
+
+        @Override
+        public DataNotificationRequest build() {
+            if (callbackId == null  || dataLocation == null) {
+                throw new IllegalArgumentException("Missing one or more of the 
mandatory arguments:"
+                        + " callbackId, dataLocation");
+            }
+            return new DataNotificationRequest(handler, callbackId, 
dataLocation);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
new file mode 100644
index 0000000..73a4199
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.FalconNotificationService;
+import org.apache.falcon.notification.service.event.JobCompletedEvent;
+import 
org.apache.falcon.notification.service.request.JobCompletionNotificationRequest;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowExecutionListener;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+
+/**
+ * This notification service notifies {@link NotificationHandler} when an 
external job
+ * completes.
+ */
+public class JobCompletionService implements FalconNotificationService, 
WorkflowExecutionListener {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobCompletionService.class);
+    private static DateTimeZone utc = 
DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"));
+
+    private List<NotificationHandler> listeners = 
Collections.synchronizedList(new ArrayList<NotificationHandler>());
+
+    @Override
+    public void register(NotificationRequest notifRequest) throws 
NotificationServiceException {
+        if (notifRequest == null) {
+            throw new NotificationServiceException("Request object cannot be 
null");
+        }
+        listeners.add(notifRequest.getHandler());
+        JobCompletionNotificationRequest request = 
(JobCompletionNotificationRequest) notifRequest;
+        // Check if the job is already complete.
+        // If yes, send a notification synchronously.
+        // If not, we expect that this class will get notified when the job 
completes
+        // as this class is a listener to WorkflowJobEndNotificationService.
+        if (request.getExternalId() != null && request.getCluster() != null) {
+            try {
+                Properties props = 
DAGEngineFactory.getDAGEngine(request.getCluster())
+                        .getConfiguration(request.getExternalId());
+                WorkflowExecutionContext context = createContext(props);
+                if (context.hasWorkflowFailed()) {
+                    onFailure(context);
+                } else if (context.hasWorkflowSucceeded()) {
+                    onSuccess(context);
+                }
+            } catch (FalconException e) {
+                throw new NotificationServiceException(e);
+            }
+        }
+    }
+
+    @Override
+    public void unregister(NotificationHandler handler, ID listenerID) {
+        listeners.remove(handler);
+    }
+
+    @Override
+    public RequestBuilder createRequestBuilder(NotificationHandler handler, ID 
callbackID) {
+        return new JobCompletionRequestBuilder(handler, callbackID);
+    }
+
+    @Override
+    public String getName() {
+        return "JobCompletionService";
+    }
+
+    @Override
+    public void init() throws FalconException {
+        LOG.debug("Registering to job end notification service");
+        Services.get().<WorkflowJobEndNotificationService>getService(
+                
WorkflowJobEndNotificationService.SERVICE_NAME).registerListener(this);
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+
+    }
+
+    @Override
+    public void onSuccess(WorkflowExecutionContext context) throws 
FalconException {
+        onEnd(context, WorkflowJob.Status.SUCCEEDED);
+    }
+
+    @Override
+    public void onFailure(WorkflowExecutionContext context) throws 
FalconException {
+        onEnd(context, WorkflowJob.Status.FAILED);
+    }
+
+    @Override
+    public void onStart(WorkflowExecutionContext context) throws 
FalconException {
+        // Do nothing
+    }
+
+    @Override
+    public void onSuspend(WorkflowExecutionContext context) throws 
FalconException {
+        // Do nothing
+    }
+
+    @Override
+    public void onWait(WorkflowExecutionContext context) throws 
FalconException {
+        // Do nothing
+    }
+
+    private void onEnd(WorkflowExecutionContext context, WorkflowJob.Status 
status) throws FalconException {
+        JobCompletedEvent event = new 
JobCompletedEvent(constructCallbackID(context), status, getEndTime(context));
+        for (NotificationHandler handler : listeners) {
+            LOG.debug("Notifying {} with event {}", handler, 
event.getTarget());
+            handler.onEvent(event);
+        }
+    }
+
+    private DateTime getEndTime(WorkflowExecutionContext context) throws 
FalconException {
+        return new 
DateTime(DAGEngineFactory.getDAGEngine(context.getClusterName())
+                .info(context.getWorkflowId()).getEndTime());
+    }
+
+    // Constructs the callback ID from the details available in the context.
+    private ID constructCallbackID(WorkflowExecutionContext context) throws 
FalconException {
+        ID id = new ID(EntityType.valueOf(context.getEntityType()), 
context.getEntityName());
+        id.setCluster(context.getClusterName());
+        id.setInstanceTime(new 
DateTime(EntityUtil.parseDateUTC(context.getNominalTimeAsISO8601()), utc));
+        return id;
+    }
+
+    private WorkflowExecutionContext createContext(Properties props) {
+        // for backwards compatibility, read all args from properties
+        Map<WorkflowExecutionArgs, String> wfProperties = new 
HashMap<WorkflowExecutionArgs, String>();
+        for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
+            String optionValue = props.getProperty(arg.getName());
+            if (StringUtils.isNotEmpty(optionValue)) {
+                wfProperties.put(arg, optionValue);
+            }
+        }
+
+        return WorkflowExecutionContext.create(wfProperties);
+    }
+
+    /**
+     * Builds {@link JobCompletionNotificationRequest}.
+     */
+    public static class JobCompletionRequestBuilder extends 
RequestBuilder<JobCompletionNotificationRequest> {
+        private String cluster;
+        private String externalId;
+
+        public JobCompletionRequestBuilder(NotificationHandler handler, ID 
callbackID) {
+            super(handler, callbackID);
+        }
+
+        /**
+         * @param clusterName
+         */
+        public JobCompletionRequestBuilder setCluster(String clusterName) {
+            this.cluster = clusterName;
+            return this;
+        }
+
+        /**
+         * @param id - The external job id for which job completion 
notification is requested.
+         * @return
+         */
+        public JobCompletionRequestBuilder setExternalId(String id) {
+            this.externalId = id;
+            return this;
+        }
+
+        @Override
+        public JobCompletionNotificationRequest build() {
+            return new JobCompletionNotificationRequest(handler, callbackId, 
cluster, externalId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
new file mode 100644
index 0000000..848f89c
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.impl;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.FalconNotificationService;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.JobScheduledEvent;
+import 
org.apache.falcon.notification.service.request.JobCompletionNotificationRequest;
+import 
org.apache.falcon.notification.service.request.JobScheduleNotificationRequest;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This notification service notifies {@link NotificationHandler} when an 
execution
+ * instance is scheduled on a DAG Engine.
+ * Current implementation of scheduler handles parallel scheduling of 
instances,
+ * dependencies (an instance depending on completion of another) and priority.
+ */
+public class SchedulerService implements FalconNotificationService, 
NotificationHandler,
+        RemovalListener<ID, List<ExecutionInstance>> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerService.class);
+
+    public static final String DEFAULT_NUM_OF_SCHEDULER_THREADS = "5";
+    public static final String NUM_OF_SCHEDULER_THREADS_PROP = 
"scheduler.threads.count";
+
+    // Once scheduling conditions are met, it goes to run queue to be run on 
DAGEngine, based on priority.
+    private ThreadPoolExecutor runQueue;
+
+    private static final StateStore STATE_STORE = AbstractStateStore.get();
+
+    private Cache<ID, Object> instancesToIgnore;
+    // TODO : limit the no. of awaiting instances per entity
+    private LoadingCache<ID, List<ExecutionInstance>> awaitedInstances;
+
+    @Override
+    public void register(NotificationRequest notifRequest) throws 
NotificationServiceException {
+        JobScheduleNotificationRequest request = 
(JobScheduleNotificationRequest) notifRequest;
+        if (request.getInstance() == null) {
+            throw new NotificationServiceException("Request must contain an 
instance.");
+        }
+        // When the instance is getting rescheduled for run. As in the case of 
suspend and resume.
+        Object obj = 
instancesToIgnore.getIfPresent(request.getInstance().getId());
+        if (obj != null) {
+            instancesToIgnore.invalidate(request.getInstance().getId());
+        }
+        runQueue.execute(new InstanceRunner(request));
+    }
+
+    @Override
+    public void unregister(NotificationHandler handler, ID listenerID) {
+        // If ID is that of an entity, do nothing
+        if (listenerID.getInstanceTime() == null) {
+            return;
+        }
+        // Not efficient to iterate over elements to remove this. Add to 
ignore list.
+        instancesToIgnore.put(listenerID, new Object());
+
+    }
+
+    @Override
+    public RequestBuilder createRequestBuilder(NotificationHandler handler, ID 
callbackID) {
+        return new JobScheduleRequestBuilder(handler, callbackID);
+    }
+
+    @Override
+    public String getName() {
+        return "JobSchedulerService";
+    }
+
+    @Override
+    public void init() throws FalconException {
+        int numThreads = 
Integer.parseInt(RuntimeProperties.get().getProperty(NUM_OF_SCHEDULER_THREADS_PROP,
+                DEFAULT_NUM_OF_SCHEDULER_THREADS));
+
+        // Uses a priority queue to ensure instances with higher priority gets 
run first.
+        PriorityBlockingQueue<Runnable> pq = new 
PriorityBlockingQueue<Runnable>(20, new PriorityComparator());
+        runQueue = new ThreadPoolExecutor(1, numThreads, 0L, 
TimeUnit.MILLISECONDS, pq);
+
+        CacheLoader instanceCacheLoader = new CacheLoader<ID, 
Collection<ExecutionInstance>>() {
+            @Override
+            public Collection<ExecutionInstance> load(ID id) throws Exception {
+                List<InstanceState.STATE> states = new 
ArrayList<InstanceState.STATE>();
+                states.add(InstanceState.STATE.READY);
+                List<ExecutionInstance> readyInstances = new ArrayList<>();
+                // TODO : Limit it to no. of instances that can be run in 
parallel.
+                for (InstanceState state : 
STATE_STORE.getExecutionInstances(id.getEntityID(), states)) {
+                    readyInstances.add(state.getInstance());
+                }
+                return readyInstances;
+            }
+        };
+
+        awaitedInstances = CacheBuilder.newBuilder()
+                .maximumSize(100)
+                .concurrencyLevel(1)
+                .removalListener(this)
+                .build(instanceCacheLoader);
+
+        instancesToIgnore = CacheBuilder.newBuilder()
+                .expireAfterWrite(1, TimeUnit.HOURS)
+                .concurrencyLevel(1)
+                .build();
+        // Interested in all job completion events.
+        JobCompletionNotificationRequest completionRequest = 
(JobCompletionNotificationRequest)
+                
NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_COMPLETION)
+                        .createRequestBuilder(this, null).build();
+        NotificationServicesRegistry.register(completionRequest);
+    }
+
+    @Override
+    public void onRemoval(RemovalNotification<ID, List<ExecutionInstance>> 
removalNotification) {
+        // When instances are removed due to size...
+        // Ensure instances are persisted in state store and add to another 
list of awaited entities.
+        if (removalNotification.wasEvicted()) {
+            for (ExecutionInstance instance : removalNotification.getValue()) {
+                InstanceState state = new InstanceState(instance);
+                state.setCurrentState(InstanceState.STATE.READY);
+                try {
+                    STATE_STORE.updateExecutionInstance(state);
+                } catch (StateStoreException e) {
+                    throw new RuntimeException("Unable to persist the ready 
instance " + instance.getId(), e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onEvent(Event event) throws FalconException {
+        // Interested only in job completion events.
+        if (event.getSource() == 
NotificationServicesRegistry.SERVICE.JOB_COMPLETION) {
+            try {
+                // Check if the instance is awaited.
+                ID id = event.getTarget();
+                List<ExecutionInstance> instances = awaitedInstances.get(id);
+                // Else, check if the entity is awaited.
+                if (instances == null) {
+                    id = id.getEntityID();
+                    instances = awaitedInstances.get(id);
+                }
+                if (instances != null && !instances.isEmpty()) {
+                    ExecutionInstance instance = instances.get(0);
+                    if (instance != null && instance.getAwaitingPredicates() 
!= null) {
+                        for (Predicate predicate : 
instance.getAwaitingPredicates()) {
+                            if (predicate.getType() == 
Predicate.TYPE.JOB_COMPLETION) {
+                                // Construct a request object
+                                NotificationHandler handler = ReflectionUtils
+                                        
.getInstanceByClassName(predicate.getClauseValue("handler").toString());
+                                JobScheduleRequestBuilder requestBuilder = new 
JobScheduleRequestBuilder(
+                                        handler, instance.getId());
+                                requestBuilder.setInstance(instance);
+                                InstanceRunner runner = new 
InstanceRunner(requestBuilder.build());
+                                // Since an instance just finished of the same 
entity just finished
+                                if (id.equals(instance.getId())) {
+                                    runner.incrementAllowedInstances();
+                                }
+                                runQueue.execute(runner);
+                                instances.remove(instance);
+                            }
+                        }
+                    }
+                }
+                if (instances != null && instances.isEmpty()) {
+                    awaitedInstances.invalidate(id);
+                }
+            } catch (Exception e) {
+                throw new FalconException(e);
+            }
+        }
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        runQueue.shutdownNow();
+        instancesToIgnore.invalidateAll();
+    }
+
+    private void notifyFailureEvent(JobScheduleNotificationRequest request) 
throws FalconException {
+        JobScheduledEvent event = new 
JobScheduledEvent(request.getCallbackId(), JobScheduledEvent.STATUS.FAILED);
+        request.getHandler().onEvent(event);
+    }
+
+    private class InstanceRunner implements Runnable {
+        private final ExecutionInstance instance;
+        private final JobScheduleNotificationRequest request;
+        private short priority;
+        private int allowedParallelInstances = 1;
+
+        public InstanceRunner(JobScheduleNotificationRequest request) {
+            this.request = request;
+            this.instance = request.getInstance();
+            this.priority = getPriority(instance.getEntity()).getPriority();
+            allowedParallelInstances = 
EntityUtil.getParallel(instance.getEntity());
+        }
+
+        public int incrementAllowedInstances() {
+            return ++allowedParallelInstances;
+        }
+
+        private EntityUtil.JOBPRIORITY getPriority(Entity entity) {
+            switch(entity.getEntityType()) {
+            case PROCESS :
+                return EntityUtil.getPriority((Process)entity);
+            default :
+                throw new UnsupportedOperationException("Scheduling of 
entities other "
+                        + "than process is not supported yet.");
+            }
+        }
+
+        @Override
+        public void run() {
+            try {
+                // If de-registered
+                if (instancesToIgnore.getIfPresent(instance.getId()) != null) {
+                    LOG.debug("Instance {} has been deregistered. Ignoring.", 
instance.getId());
+                    instancesToIgnore.invalidate(instance.getId());
+                    return;
+                }
+                LOG.debug("Received request to run instance {}", 
instance.getId());
+                if (checkConditions()) {
+                    // If instance not already scheduled.
+                    String externalId = instance.getExternalID();
+                    if (externalId == null) {
+                        externalId = 
DAGEngineFactory.getDAGEngine(instance.getCluster()).run(instance);
+                        LOG.info("Scheduled job {} for instance {}", 
externalId, instance.getId());
+                    }
+                    JobScheduledEvent event = new 
JobScheduledEvent(instance.getId(),
+                            JobScheduledEvent.STATUS.SUCCESSFUL);
+                    event.setExternalID(externalId);
+                    event.setStartTime(new 
DateTime(DAGEngineFactory.getDAGEngine(instance.getCluster())
+                            .info(externalId).getStartTime()));
+                    request.getHandler().onEvent(event);
+                }
+            } catch (FalconException e) {
+                LOG.error("Error running the instance : " + instance.getId(), 
e);
+                try {
+                    notifyFailureEvent(request);
+                } catch (FalconException fe) {
+                    throw new RuntimeException("Unable to onEvent : " + 
request.getCallbackId(), fe);
+                }
+            }
+        }
+
+        public short getPriority() {
+            return priority;
+        }
+
+        private boolean checkConditions() throws FalconException {
+            try {
+                // TODO : If and when the no. of scheduling conditions 
increase, consider chaining condition checks.
+                // Run if all conditions are met.
+                if (instanceCheck() && dependencyCheck()) {
+                    return true;
+                } else {
+                    ID entityID = instance.getId().getEntityID();
+                    // Instance is awaiting scheduling conditions to be met. 
Add predicate to that effect.
+                    
instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(request.getHandler(),
+                            entityID));
+                    updateAwaitedInstances(entityID);
+                    LOG.debug("Schedule conditions not met for instance {}. 
Awaiting on {}",
+                            instance.getId(), entityID);
+                }
+            } catch (Exception e) {
+                LOG.error("Instance run failed with error : ", e);
+                throw new FalconException("Instance run failed", e);
+            }
+            return false;
+        }
+
+        private void updateAwaitedInstances(ID id) throws ExecutionException {
+            synchronized (id) {
+                List<ExecutionInstance> instances = awaitedInstances.get(id);
+                if (instances == null) {
+                    // Order is FIFO.
+                    instances = new LinkedList<>();
+                    awaitedInstances.put(id, instances);
+                }
+                instances.add(instance);
+            }
+        }
+
+        private boolean dependencyCheck() throws FalconException, 
ExecutionException {
+            if (request.getDependencies() == null || 
request.getDependencies().isEmpty()) {
+                return true;
+            }
+
+            for (ExecutionInstance execInstance : request.getDependencies()) {
+                // Dependants should wait for this instance to complete. Add 
predicate to that effect.
+                
instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(
+                        request.getHandler(), execInstance.getId()));
+                updateAwaitedInstances(execInstance.getId());
+            }
+            return false;
+        }
+
+        // Ensure no. of instances running in parallel is per entity 
specification.
+        private boolean instanceCheck() throws StateStoreException {
+            return STATE_STORE.getExecutionInstances(instance.getEntity(), 
instance.getCluster(),
+                    InstanceState.getRunningStates()).size() < 
allowedParallelInstances;
+        }
+    }
+
+    // A priority based comparator to be used by the {@link 
java.util.concurrent.PriorityBlockingQueue}
+    private static class PriorityComparator<T extends InstanceRunner> 
implements Comparator<T>, Serializable {
+        @Override
+        public int compare(T o1, T o2) {
+            return o1.getPriority() - o2.getPriority();
+        }
+    }
+
+    /**
+     * Builds {@link JobScheduleNotificationRequest}.
+     */
+    public static class JobScheduleRequestBuilder extends 
RequestBuilder<JobScheduleNotificationRequest> {
+        private List<ExecutionInstance> dependencies;
+        private ExecutionInstance instance;
+
+        public JobScheduleRequestBuilder(NotificationHandler handler, ID 
callbackID) {
+            super(handler, callbackID);
+        }
+
+        /**
+         * @param execInstance that needs to be scheduled
+         * @return
+         */
+        public JobScheduleRequestBuilder setInstance(ExecutionInstance 
execInstance) {
+            this.instance = execInstance;
+            return this;
+        }
+
+        /**
+         * Dependencies to wait for before scheduling.
+         * @param dependencies
+         */
+        public void setDependencies(List<ExecutionInstance> dependencies) {
+            this.dependencies = dependencies;
+        }
+
+        @Override
+        public JobScheduleNotificationRequest build() {
+            if (callbackId == null  || instance == null) {
+                throw new IllegalArgumentException("Missing one or more of the 
mandatory arguments:"
+                        + " callbackId, execInstance");
+            }
+            return new JobScheduleNotificationRequest(handler, callbackId, 
instance, dependencies);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java
new file mode 100644
index 0000000..2628dc8
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+
+import java.util.TimeZone;
+
+/**
+ * Request intended for {@link 
org.apache.falcon.notification.service.impl.AlarmService}
+ * for time based notifications.
+ * The setter methods of the class support chaining similar to a builder class.
+ * TODO : Might need a separate builder too.
+ */
+public class AlarmRequest extends NotificationRequest {
+
+    private DateTime startTime;
+    private DateTime endTime;
+    private Frequency frequency;
+    private TimeZone timeZone;
+
+    /**
+     * Constructor.
+     * @param notifHandler
+     * @param callbackId
+     */
+    public AlarmRequest(NotificationHandler notifHandler, ID callbackId, 
DateTime start,
+                        DateTime end, Frequency freq, TimeZone tz) {
+        this.handler = notifHandler;
+        this.callbackId = callbackId;
+        this.service = NotificationServicesRegistry.SERVICE.TIME;
+        this.startTime = start;
+        this.endTime = end;
+        this.frequency = freq;
+        this.timeZone = tz;
+    }
+
+    /**
+     * @return frequency of the timer
+     */
+    public Frequency getFrequency() {
+        return frequency;
+    }
+
+    /**
+     * @return start time of the timer
+     */
+    public DateTime getStartTime() {
+        return startTime;
+    }
+
+    /**
+     * @return end time of the timer
+     */
+    public DateTime getEndTime() {
+        return endTime;
+    }
+
+    /**
+     * @return timezone of the request.
+     */
+    public TimeZone getTimeZone() {
+        return timeZone;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
new file mode 100644
index 0000000..8393de0
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Request intended for {@link import 
org.apache.falcon.notification.service.impl.DataAvailabilityService}
+ * for data notifications.
+ * The setter methods of the class support chaining similar to a builder class.
+ * TODO : Complete/modify this skeletal class
+ */
+public class DataNotificationRequest extends NotificationRequest {
+    private final Path dataLocation;
+    private String cluster;
+
+    /**
+     * @return data location to be watched.
+     */
+    public Path getDataLocation() {
+        return dataLocation;
+    }
+
+    /**
+     * Given a number of instances, should the service wait for exactly those 
many,
+     * at least those many or at most those many instances.
+     */
+    public enum INSTANCELIMIT {
+        EXACTLY_N,
+        AT_LEAST_N,
+        AT_MOST_N
+    }
+
+    /**
+     * Constructor.
+     * @param notifHandler
+     * @param callbackId
+     */
+    public DataNotificationRequest(NotificationHandler notifHandler, ID 
callbackId, Path location) {
+        this.handler = notifHandler;
+        this.callbackId = callbackId;
+        this.dataLocation = location;
+        this.service = NotificationServicesRegistry.SERVICE.DATA;
+    }
+
+    /**
+     * @return cluster name
+     */
+    public String getCluster() {
+        return cluster;
+    }
+
+    /**
+     * @param clusterName
+     * @return This instance
+     */
+    public DataNotificationRequest setCluster(String clusterName) {
+        this.cluster = clusterName;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java
new file mode 100644
index 0000000..1d35476
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+
+/**
+ * Request intended for {@link 
org.apache.falcon.notification.service.impl.JobCompletionService}
+ * for job completion notifications.
+ * The setter methods of the class support chaining similar to a builder class.
+ */
+public class JobCompletionNotificationRequest extends NotificationRequest {
+
+    private String externalId;
+    private String cluster;
+    /**
+     * Constructor.
+     * @param notifHandler
+     * @param callbackId
+     */
+    public JobCompletionNotificationRequest(NotificationHandler notifHandler, 
ID callbackId, String clstr,
+                                            String jobId) {
+        this.handler = notifHandler;
+        this.service = NotificationServicesRegistry.SERVICE.JOB_COMPLETION;
+        this.callbackId = callbackId;
+        this.cluster = clstr;
+        this.externalId = jobId;
+    }
+
+    /**
+     * @return - The external job id for which job completion notification is 
requested.
+     */
+    public String getExternalId() {
+        return externalId;
+    }
+
+    /**
+     * @return cluster name
+     */
+    public String getCluster() {
+        return cluster;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java
new file mode 100644
index 0000000..80133bd
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+
+import java.util.List;
+
+/**
+ * Request intended for {@link 
org.apache.falcon.notification.service.impl.SchedulerService}
+ * for job run notifications.
+ * The setter methods of the class support chaining similar to a builder class.
+ */
+public class JobScheduleNotificationRequest extends NotificationRequest {
+    private ExecutionInstance instance;
+    private List<ExecutionInstance> dependencies;
+
+    /**
+     * Constructor.
+     * @param notifHandler
+     * @param id
+     */
+    public JobScheduleNotificationRequest(NotificationHandler notifHandler, ID 
id, ExecutionInstance inst,
+                                          List<ExecutionInstance> deps) {
+        this.handler = notifHandler;
+        this.service = NotificationServicesRegistry.SERVICE.JOB_SCHEDULE;
+        this.callbackId = id;
+        this.instance = inst;
+        this.dependencies = deps;
+    }
+
+    /**
+     * @return execution instance that will be scheduled.
+     */
+    public ExecutionInstance getInstance() {
+        return instance;
+    }
+
+    public List<ExecutionInstance> getDependencies() {
+        return dependencies;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java
new file mode 100644
index 0000000..c89668d
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+
+/**
+ * An abstract class that all notification requests of services must extend.
+ * TODO : Complete/modify this skeleton class
+ */
+public abstract class NotificationRequest {
+    protected NotificationHandler handler;
+    protected ID callbackId;
+    protected NotificationServicesRegistry.SERVICE service;
+
+    /**
+     * @return - The service that this request is intended for
+     */
+    public NotificationServicesRegistry.SERVICE getService() {
+        return service;
+    }
+
+    /**
+     * @return - The entity that needs to be notified when this request is 
satisfied.
+     */
+    public ID getCallbackId() {
+        return callbackId;
+    }
+
+    /**
+     * @return - The notification handler.
+     */
+    public NotificationHandler getHandler() {
+        return handler;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java 
b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
new file mode 100644
index 0000000..fb4ce82
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.predicate;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.DataEvent;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.state.ID;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents the gating condition for which an instance is waiting before it 
is scheduled.
+ * This will be serialized and stored in state store.
+ */
+public class Predicate implements Serializable {
+    /**
+     * Type of predicate, currently data and time are supported.
+     */
+    public enum TYPE {
+        DATA,
+        TIME,
+        JOB_COMPLETION
+    }
+
+    private final TYPE type;
+
+    // A key-value pair of clauses that need make this predicate.
+    private Map<String, Comparable> clauses = new HashMap<String, 
Comparable>();
+
+    // A generic "any" object that can be used when a particular key is 
allowed to have any value.
+    public static final Comparable<? extends Serializable> ANY = new Any();
+
+    /**
+     * @return type of predicate
+     */
+    public TYPE getType() {
+        return type;
+    }
+
+    /**
+     * @param key
+     * @return the value corresponding to the key
+     */
+    public Comparable getClauseValue(String key) {
+        return clauses.get(key);
+    }
+
+    /**
+     * Compares this predicate with the supplied predicate.
+     *
+     * @param suppliedPredicate
+     * @return true, if the clauses of the predicates match. false, otherwise.
+     */
+    public boolean evaluate(Predicate suppliedPredicate) {
+        if (type != suppliedPredicate.getType()) {
+            return false;
+        }
+        boolean eval = true;
+        // Iterate over each clause and ensure it matches the clauses of this 
predicate.
+        for (Map.Entry<String, Comparable> entry : 
suppliedPredicate.getClauses().entrySet()) {
+            eval = eval && matches(entry.getKey(), entry.getValue());
+            if (!eval) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    // Compares the two values of a key.
+    private boolean matches(String lhs, Comparable<? extends Serializable> 
rhs) {
+        if (clauses.containsKey(lhs) && clauses.get(lhs) != null
+                && rhs != null) {
+            if (clauses.get(lhs).equals(ANY) || rhs.equals(ANY)) {
+                return true;
+            } else {
+                return clauses.get(lhs).compareTo(rhs) == 0;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * @param type of predicate
+     */
+    public Predicate(TYPE type) {
+        this.type = type;
+    }
+
+    /**
+     * @return the name-value pairs that make up the clauses of this predicate.
+     */
+    public Map<String, Comparable> getClauses() {
+        return clauses;
+    }
+
+    /**
+     * @param lhs - The key in the key-value pair of a clause
+     * @param rhs - The value in the key-value pair of a clause
+     * @return This instance
+     */
+    public Predicate addClause(String lhs, Comparable<? extends Serializable> 
rhs) {
+        clauses.put(lhs, rhs);
+        return this;
+    }
+
+    /**
+     * Creates a Predicate of Type TIME.
+     *
+     * @param start
+     * @param end
+     * @param instanceTime
+     * @return
+     */
+    public static Predicate createTimePredicate(long start, long end, long 
instanceTime) {
+        return new Predicate(TYPE.TIME)
+                .addClause("start", (start < 0) ? ANY : start)
+                .addClause("end", (end < 0) ? ANY : end)
+                .addClause("instanceTime", (instanceTime < 0) ? ANY : 
instanceTime);
+    }
+
+    /**
+     * Creates a predicate of type DATA.
+     *
+     * @param location
+     * @return
+     */
+    public static Predicate createDataPredicate(Location location) {
+        return new Predicate(TYPE.DATA)
+                .addClause("path", (location == null) ? ANY : 
location.getPath())
+                .addClause("type", (location == null) ? ANY : 
location.getType());
+    }
+
+    /**
+     * Creates a predicate of type JOB_COMPLETION.
+     *
+     * @param handler
+     * @param id
+     * @return
+     */
+    public static Predicate createJobCompletionPredicate(NotificationHandler 
handler, ID id) {
+        return new Predicate(TYPE.JOB_COMPLETION)
+                .addClause("instanceId", id.toString())
+                .addClause("handler", handler.getClass().getName());
+    }
+
+    /**
+     * Creates a predicate from an event based on the event source and values 
in the event.
+     *
+     * @param event
+     * @return
+     * @throws FalconException
+     */
+    public static Predicate getPredicate(Event event) throws FalconException {
+        if (event.getSource() == NotificationServicesRegistry.SERVICE.DATA) {
+            DataEvent dataEvent = (DataEvent) event;
+            if (dataEvent.getDataLocation() != null && dataEvent.getDataType() 
!= null) {
+                Location loc = new Location();
+                loc.setPath(dataEvent.getDataLocation().toString());
+                loc.setType(dataEvent.getDataType());
+                return createDataPredicate(loc);
+            } else {
+                throw new FalconException("Event does not have enough data to 
create a predicate");
+            }
+        } else if (event.getSource() == 
NotificationServicesRegistry.SERVICE.TIME) {
+            TimeElapsedEvent timeEvent = (TimeElapsedEvent) event;
+            if (timeEvent.getStartTime() != null && timeEvent.getEndTime() != 
null) {
+                long instanceTime = (timeEvent.getInstanceTime() == null)? -1 
: timeEvent.getInstanceTime().getMillis();
+                return 
Predicate.createTimePredicate(timeEvent.getStartTime().getMillis(),
+                        timeEvent.getEndTime().getMillis(), instanceTime);
+            } else {
+                throw new FalconException("Event does not have enough data to 
create a predicate");
+            }
+
+        } else {
+            throw new FalconException("Unhandled event type " + 
event.getSource());
+        }
+    }
+
+    /**
+     * An "Any" class that returns '0' when compared to any other object.
+     */
+    private static class Any implements Comparable, Serializable {
+        @Override
+        public int compareTo(Object o) {
+            return 0;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return super.equals(o);
+        }
+
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java 
b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
new file mode 100644
index 0000000..15aea9a
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.InvalidStateTransitionException;
+
+/**
+ * Represents the state of a schedulable entity.
+ * Implements {@link org.apache.falcon.state.StateMachine} for an entity.
+ */
+public class EntityState implements StateMachine<EntityState.STATE, 
EntityState.EVENT> {
+    private Entity entity;
+    private STATE currentState;
+    private static final STATE INITIAL_STATE = STATE.SUBMITTED;
+
+    /**
+     * Enumerates all the valid states of a schedulable entity and the valid 
transitions from that state.
+     */
+    public enum STATE implements StateMachine<EntityState.STATE, 
EntityState.EVENT> {
+        SUBMITTED {
+            @Override
+            public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
+                switch (event) {
+                case SCHEDULE:
+                    return STATE.SCHEDULED;
+                case SUBMIT:
+                    return this;
+                default:
+                    throw new InvalidStateTransitionException("Submitted 
entities can only be scheduled.");
+                }
+            }
+        },
+        SCHEDULED {
+            @Override
+            public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
+                switch (event) {
+                case SUSPEND:
+                    return STATE.SUSPENDED;
+                case SCHEDULE:
+                    return this;
+                default:
+                    throw new InvalidStateTransitionException("Scheduled 
entities can only be suspended.");
+                }
+            }
+        },
+        SUSPENDED {
+            @Override
+            public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
+                switch (event) {
+                case RESUME:
+                    return STATE.SCHEDULED;
+                case SUSPEND:
+                    return this;
+                default:
+                    throw new InvalidStateTransitionException("Suspended 
entities can only be resumed.");
+                }
+            }
+        }
+    }
+
+    /**
+     * Enumerates all the valid events that can cause a state transition.
+     */
+    public enum EVENT {
+        SUBMIT,
+        SCHEDULE,
+        SUSPEND,
+        RESUME
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param e - Entity
+     */
+    public EntityState(Entity e) {
+        this.entity = e;
+        currentState = INITIAL_STATE;
+    }
+
+    /**
+     * @return - The entity
+     */
+    public Entity getEntity() {
+        return entity;
+    }
+
+    /**
+     * @param e - entity
+     * @return - This instance
+     */
+    public EntityState setEntity(Entity e) {
+        this.entity = e;
+        return this;
+    }
+
+    /**
+     * @return - Current state of the entity.
+     */
+    public STATE getCurrentState() {
+        return currentState;
+    }
+
+    /**
+     * @param state
+     * @return - This instance
+     */
+    public EntityState setCurrentState(STATE state) {
+        this.currentState = state;
+        return this;
+    }
+
+    @Override
+    public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
+        return currentState.nextTransition(event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java 
b/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
new file mode 100644
index 0000000..44ec3fc
--- /dev/null
+++ 
b/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
+
+/**
+ * Any handler interested in handling state changes of entities must implement 
this interface.
+ */
+public interface EntityStateChangeHandler {
+
+    /**
+     * Invoked when an entity is submitted.
+     *
+     * @param entity
+     * @throws FalconException
+     */
+    void onSubmit(Entity entity) throws FalconException;
+
+    /**
+     * Invoked when an entity is scheduled.
+     *
+     * @param entity
+     * @throws FalconException
+     */
+    void onSchedule(Entity entity) throws FalconException;
+
+    /**
+     * Invoked when an entity is suspended.
+     *
+     * @param entity
+     * @throws FalconException
+     */
+    void onSuspend(Entity entity) throws FalconException;
+
+    /**
+     * Invoked when the an intity is resumed.
+     *
+     * @param entity
+     * @throws FalconException
+     */
+    void onResume(Entity entity) throws FalconException;
+}

Reply via email to