Repository: falcon Updated Branches: refs/heads/master 36a7d6b8b -> aa18bfdae
FALCON-1592 Code Refactoring: Introduce Event type for scheduler events (Ajay Yadava) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/aa18bfda Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/aa18bfda Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/aa18bfda Branch: refs/heads/master Commit: aa18bfdaefd7f771f45ca9314beb0ff8ac4d53f2 Parents: 36a7d6b Author: Pallavi Rao <[email protected]> Authored: Mon Nov 9 17:37:44 2015 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Nov 9 17:37:44 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../execution/ProcessExecutionInstance.java | 8 +++--- .../falcon/execution/ProcessExecutor.java | 15 +++++----- .../notification/service/event/DataEvent.java | 9 ++---- .../notification/service/event/Event.java | 15 +++++----- .../notification/service/event/EventType.java | 30 ++++++++++++++++++++ .../service/event/JobCompletedEvent.java | 9 ++---- .../service/event/JobScheduledEvent.java | 9 ++---- .../service/event/TimeElapsedEvent.java | 9 ++---- .../service/impl/SchedulerService.java | 3 +- .../org/apache/falcon/predicate/Predicate.java | 8 +++--- 11 files changed, 66 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f87c7cd..586847e 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -14,6 +14,8 @@ Trunk (Unreleased) FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS + FALCON-1592 Code Refactoring: Introduce Event type for scheduler events (Ajay Yadava via Pallavi Rao) + FALCON-1593 Oozie setup failing in setup phase (Praveen Adlakha via Ajay Yadava) FALCON-1582 Documentation for globally disabling retries (Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java index 19089c4..8c84f2b 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java @@ -135,16 +135,16 @@ public class ProcessExecutionInstance extends ExecutionInstance { @Override public void onEvent(Event event) throws FalconException { - switch (event.getSource()) { - case JOB_SCHEDULE: + switch (event.getType()) { + case JOB_SCHEDULED: JobScheduledEvent jobScheduleEvent = (JobScheduledEvent) event; setExternalID(jobScheduleEvent.getExternalID()); setActualStart(jobScheduleEvent.getStartTime()); break; - case JOB_COMPLETION: + case JOB_COMPLETED: setActualEnd(((JobCompletedEvent)event).getEndTime()); break; - case DATA: + case DATA_AVAILABLE: // Data has not become available and the wait time has passed if (((DataEvent) event).getStatus() == DataEvent.STATUS.UNAVAILABLE) { if (getTimeOutInMillis() <= (System.currentTimeMillis() - getCreationTime().getMillis())) { http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java index 68c34e7..d10d2fd 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java @@ -29,6 +29,7 @@ import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.exception.InvalidStateTransitionException; import org.apache.falcon.notification.service.NotificationServicesRegistry; import org.apache.falcon.notification.service.event.Event; +import org.apache.falcon.notification.service.event.EventType; import org.apache.falcon.notification.service.event.JobCompletedEvent; import org.apache.falcon.notification.service.event.TimeElapsedEvent; import org.apache.falcon.notification.service.impl.JobCompletionService; @@ -271,7 +272,7 @@ public class ProcessExecutor extends EntityExecutor { private ProcessExecutionInstance buildInstance(Event event) throws FalconException { // If a time triggered instance, use nominal time from event - if (event.getSource() == NotificationServicesRegistry.SERVICE.TIME) { + if (event.getType() == EventType.TIME_ELAPSED) { TimeElapsedEvent timeEvent = (TimeElapsedEvent) event; LOG.debug("Creating a new process instance for nominal time {}.", timeEvent.getInstanceTime()); return new ProcessExecutionInstance(process, timeEvent.getInstanceTime(), cluster); @@ -299,7 +300,7 @@ public class ProcessExecutor extends EntityExecutor { } } } catch (Exception e) { - throw new FalconException("Unable to handle event with source : " + event.getSource() + " with target:" + throw new FalconException("Unable to handle event of type : " + event.getType() + " with target:" + event.getTarget(), e); } } @@ -307,14 +308,14 @@ public class ProcessExecutor extends EntityExecutor { private void handleEvent(Event event) throws FalconException { ProcessExecutionInstance instance; try { - switch (event.getSource()) { + switch (event.getType()) { // TODO : Handle cases where scheduling fails. - case JOB_SCHEDULE: + case JOB_SCHEDULED: instance = instances.get(event.getTarget()); instance.onEvent(event); stateService.handleStateChange(instance, InstanceState.EVENT.SCHEDULE, this); break; - case JOB_COMPLETION: + case JOB_COMPLETED: instance = instances.get(event.getTarget()); instance.onEvent(event); switch (((JobCompletedEvent) event).getStatus()) { @@ -395,8 +396,8 @@ public class ProcessExecutor extends EntityExecutor { // Or, if it is job run or job complete notifications, so it can handle the instance's state transition. private boolean shouldHandleEvent(Event event) { return event.getTarget().equals(id) - || event.getSource() == NotificationServicesRegistry.SERVICE.JOB_COMPLETION - || event.getSource() == NotificationServicesRegistry.SERVICE.JOB_SCHEDULE; + || event.getType() == EventType.JOB_COMPLETED + || event.getType() == EventType.JOB_SCHEDULED; } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java index 4883fe7..1036339 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java @@ -19,7 +19,6 @@ package org.apache.falcon.notification.service.event; import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.notification.service.NotificationServicesRegistry; import org.apache.falcon.state.ID; import org.apache.hadoop.fs.Path; @@ -27,7 +26,7 @@ import org.apache.hadoop.fs.Path; * An event generated by {@link org.apache.falcon.notification.service.impl.DataAvailabilityService} * indicating availability or non-availability of a dataset. */ -public class DataEvent implements Event { +public class DataEvent extends Event { private final ID callbackID; private Path dataLocation; private LocationType dataType; @@ -46,6 +45,7 @@ public class DataEvent implements Event { this.dataLocation = location; this.dataType = locType; this.status = availability; + this.type = EventType.DATA_AVAILABLE; } public STATUS getStatus() { @@ -65,11 +65,6 @@ public class DataEvent implements Event { } @Override - public NotificationServicesRegistry.SERVICE getSource() { - return NotificationServicesRegistry.SERVICE.DATA; - } - - @Override public ID getTarget() { return callbackID; } http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java index 140973b..e162b48 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java @@ -17,21 +17,22 @@ */ package org.apache.falcon.notification.service.event; -import org.apache.falcon.notification.service.NotificationServicesRegistry; import org.apache.falcon.state.ID; /** * An events that are generated by notification services must implement this interface. */ -public interface Event { +public abstract class Event { - /** - * @return The service that generated this event - */ - NotificationServicesRegistry.SERVICE getSource(); + + protected EventType type; + + public EventType getType() { + return this.type; + } /** * @return ID of the notification handler for which this event was meant for. */ - ID getTarget(); + public abstract ID getTarget(); } http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java new file mode 100644 index 0000000..59f5cba --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.notification.service.event; + +/** + * Types of event. + */ +public enum EventType { + TIME_ELAPSED, + DATA_AVAILABLE, + JOB_COMPLETED, + JOB_SCHEDULED + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java index c587343..df7c621 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java @@ -17,7 +17,6 @@ */ package org.apache.falcon.notification.service.event; -import org.apache.falcon.notification.service.NotificationServicesRegistry; import org.apache.falcon.state.ID; import org.apache.oozie.client.WorkflowJob; import org.joda.time.DateTime; @@ -26,7 +25,7 @@ import org.joda.time.DateTime; * An event generated by {@link org.apache.falcon.notification.service.impl.JobCompletionService} * indicating completion of a Job. */ -public class JobCompletedEvent implements Event { +public class JobCompletedEvent extends Event { private WorkflowJob.Status status; private final ID callbackID; @@ -36,6 +35,7 @@ public class JobCompletedEvent implements Event { this.callbackID = callbackID; this.status = jobStatus; this.endTime = end; + this.type = EventType.JOB_COMPLETED; } public WorkflowJob.Status getStatus() { @@ -43,11 +43,6 @@ public class JobCompletedEvent implements Event { } @Override - public NotificationServicesRegistry.SERVICE getSource() { - return NotificationServicesRegistry.SERVICE.JOB_COMPLETION; - } - - @Override public ID getTarget() { return callbackID; } http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java index 55023e7..3f48cdc 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java @@ -17,7 +17,6 @@ */ package org.apache.falcon.notification.service.event; -import org.apache.falcon.notification.service.NotificationServicesRegistry; import org.apache.falcon.state.ID; import org.joda.time.DateTime; @@ -25,7 +24,7 @@ import org.joda.time.DateTime; * An event generated by {@link org.apache.falcon.notification.service.impl.SchedulerService} * indicating if an instance was scheduled for execution. */ -public class JobScheduledEvent implements Event { +public class JobScheduledEvent extends Event { private final ID callbackID; private String externalID; private STATUS status; @@ -34,6 +33,7 @@ public class JobScheduledEvent implements Event { public JobScheduledEvent(ID callbackID, STATUS status) { this.callbackID = callbackID; this.status = status; + this.type = EventType.JOB_SCHEDULED; } public String getExternalID() { @@ -45,11 +45,6 @@ public class JobScheduledEvent implements Event { } @Override - public NotificationServicesRegistry.SERVICE getSource() { - return NotificationServicesRegistry.SERVICE.JOB_SCHEDULE; - } - - @Override public ID getTarget() { return callbackID; } http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java index 7ec4de6..84738ad 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java @@ -17,7 +17,6 @@ */ package org.apache.falcon.notification.service.event; -import org.apache.falcon.notification.service.NotificationServicesRegistry; import org.apache.falcon.state.ID; import org.joda.time.DateTime; @@ -25,7 +24,7 @@ import org.joda.time.DateTime; * An event generated by {@link org.apache.falcon.notification.service.impl.AlarmService} * indicating that a given time duration has elapsed. */ -public class TimeElapsedEvent implements Event { +public class TimeElapsedEvent extends Event { private DateTime startTime; private DateTime endTime; private DateTime instanceTime; @@ -48,11 +47,7 @@ public class TimeElapsedEvent implements Event { this.startTime = start; this.endTime = end; this.instanceTime = instTime; - } - - @Override - public NotificationServicesRegistry.SERVICE getSource() { - return NotificationServicesRegistry.SERVICE.TIME; + this.type = EventType.TIME_ELAPSED; } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java index 848f89c..a70bc3c 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java @@ -34,6 +34,7 @@ import org.apache.falcon.execution.NotificationHandler; import org.apache.falcon.notification.service.FalconNotificationService; import org.apache.falcon.notification.service.NotificationServicesRegistry; import org.apache.falcon.notification.service.event.Event; +import org.apache.falcon.notification.service.event.EventType; import org.apache.falcon.notification.service.event.JobScheduledEvent; import org.apache.falcon.notification.service.request.JobCompletionNotificationRequest; import org.apache.falcon.notification.service.request.JobScheduleNotificationRequest; @@ -179,7 +180,7 @@ public class SchedulerService implements FalconNotificationService, Notification @Override public void onEvent(Event event) throws FalconException { // Interested only in job completion events. - if (event.getSource() == NotificationServicesRegistry.SERVICE.JOB_COMPLETION) { + if (event.getType() == EventType.JOB_COMPLETED) { try { // Check if the instance is awaited. ID id = event.getTarget(); http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java index fb4ce82..fb4c8c9 100644 --- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java +++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java @@ -20,9 +20,9 @@ package org.apache.falcon.predicate; import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.feed.Location; import org.apache.falcon.execution.NotificationHandler; -import org.apache.falcon.notification.service.NotificationServicesRegistry; import org.apache.falcon.notification.service.event.DataEvent; import org.apache.falcon.notification.service.event.Event; +import org.apache.falcon.notification.service.event.EventType; import org.apache.falcon.notification.service.event.TimeElapsedEvent; import org.apache.falcon.state.ID; @@ -173,7 +173,7 @@ public class Predicate implements Serializable { * @throws FalconException */ public static Predicate getPredicate(Event event) throws FalconException { - if (event.getSource() == NotificationServicesRegistry.SERVICE.DATA) { + if (event.getType() == EventType.DATA_AVAILABLE) { DataEvent dataEvent = (DataEvent) event; if (dataEvent.getDataLocation() != null && dataEvent.getDataType() != null) { Location loc = new Location(); @@ -183,7 +183,7 @@ public class Predicate implements Serializable { } else { throw new FalconException("Event does not have enough data to create a predicate"); } - } else if (event.getSource() == NotificationServicesRegistry.SERVICE.TIME) { + } else if (event.getType() == EventType.TIME_ELAPSED) { TimeElapsedEvent timeEvent = (TimeElapsedEvent) event; if (timeEvent.getStartTime() != null && timeEvent.getEndTime() != null) { long instanceTime = (timeEvent.getInstanceTime() == null)? -1 : timeEvent.getInstanceTime().getMillis(); @@ -194,7 +194,7 @@ public class Predicate implements Serializable { } } else { - throw new FalconException("Unhandled event type " + event.getSource()); + throw new FalconException("Unhandled event type " + event.getType()); } }
