Repository: falcon Updated Branches: refs/heads/master f3b781dec -> c4958773d
FALCON-1607 Native Scheduler - Code refactoring: Refactor ID into more specific sub classes. Contributed by Ajay Yadava. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c4958773 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c4958773 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c4958773 Branch: refs/heads/master Commit: c4958773db9f7abc771c99b6b704951083020cc8 Parents: f3b781d Author: Ajay Yadava <[email protected]> Authored: Mon Nov 23 19:53:33 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Mon Nov 23 19:53:33 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 4 + .../apache/falcon/execution/EntityExecutor.java | 6 +- .../falcon/execution/ExecutionInstance.java | 4 +- .../execution/FalconExecutionService.java | 34 ++-- .../execution/ProcessExecutionInstance.java | 8 +- .../falcon/execution/ProcessExecutor.java | 44 ++--- .../service/impl/JobCompletionService.java | 15 +- .../service/impl/SchedulerService.java | 57 +++--- .../apache/falcon/state/EntityClusterID.java | 51 ++++++ .../java/org/apache/falcon/state/EntityID.java | 51 ++++++ .../main/java/org/apache/falcon/state/ID.java | 177 +++---------------- .../org/apache/falcon/state/InstanceID.java | 83 +++++++++ .../org/apache/falcon/state/InstanceState.java | 5 + .../org/apache/falcon/state/StateService.java | 4 +- .../falcon/state/store/AbstractStateStore.java | 8 +- .../falcon/state/store/EntityStateStore.java | 8 +- .../falcon/state/store/InMemoryStateStore.java | 62 +++---- .../falcon/state/store/InstanceStateStore.java | 16 +- .../workflow/engine/FalconWorkflowEngine.java | 6 +- .../execution/FalconExecutionServiceTest.java | 33 ++-- .../notification/service/AlarmServiceTest.java | 6 +- .../service/SchedulerServiceTest.java | 17 +- .../apache/falcon/predicate/PredicateTest.java | 6 +- .../falcon/state/InstanceStateServiceTest.java | 2 +- 24 files changed, 398 insertions(+), 309 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 700f7e0..e4ee8f8 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -18,6 +18,10 @@ Trunk (Unreleased) FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS + FALCON-1607 Native Scheduler - Code refactoring: Refactor ID into more specific sub classes(Ajay Yadava) + + FALCON-1587 Divide FalconCLI.twiki into sub sections for different modules on the lines of REST Api(Praveen Adlakha via Ajay Yadava) + FALCON-1552 Migration of ProcessInstanceManagerIT to use falcon unit (Narayan Periwal via Pallavi Rao) FALCON-1486 Add Unit Test cases for HiveDR(Peeyush Bishnoi via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java index 9b07b9e..88d88c1 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java @@ -20,7 +20,7 @@ package org.apache.falcon.execution; import org.apache.falcon.FalconException; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.state.ID; +import org.apache.falcon.state.EntityClusterID; import org.apache.falcon.state.InstanceStateChangeHandler; import org.apache.falcon.state.store.AbstractStateStore; import org.apache.falcon.state.store.StateStore; @@ -37,7 +37,7 @@ public abstract class EntityExecutor implements NotificationHandler, InstanceSta public static final String DEFAULT_CACHE_SIZE = "20"; protected String cluster; protected static final StateStore STATE_STORE = AbstractStateStore.get(); - protected ID id; + protected EntityClusterID id; /** * Schedules execution instances for the entity. Idempotent operation. @@ -105,7 +105,7 @@ public abstract class EntityExecutor implements NotificationHandler, InstanceSta /** * @return ID of the entity */ - public ID getId() { + public EntityClusterID getId() { return id; } } http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java index 3869ff2..2d6b67d 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java @@ -21,7 +21,7 @@ package org.apache.falcon.execution; import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.predicate.Predicate; -import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceID; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -84,7 +84,7 @@ public abstract class ExecutionInstance implements NotificationHandler { /** * @return The unique ID of this instance. The instance is referred using this ID inside the system. */ - public abstract ID getId(); + public abstract InstanceID getId(); /** * @return - The entity to which this instance belongs. http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java index b959320..b48a65b 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java @@ -24,9 +24,10 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.notification.service.event.Event; import org.apache.falcon.service.FalconService; +import org.apache.falcon.state.EntityClusterID; import org.apache.falcon.state.EntityState; import org.apache.falcon.state.EntityStateChangeHandler; -import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceID; import org.apache.falcon.state.StateService; import org.apache.falcon.state.store.AbstractStateStore; import org.slf4j.Logger; @@ -45,7 +46,7 @@ public final class FalconExecutionService implements FalconService, EntityStateC private static final Logger LOG = LoggerFactory.getLogger(FalconExecutionService.class); // Stores all entity executors in memory - private ConcurrentMap<ID, EntityExecutor> executors = new ConcurrentHashMap<ID, EntityExecutor>(); + private ConcurrentMap<EntityClusterID, EntityExecutor> executors = new ConcurrentHashMap<>(); private static FalconExecutionService executionService = new FalconExecutionService(); @@ -61,7 +62,7 @@ public final class FalconExecutionService implements FalconService, EntityStateC try { for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { EntityExecutor executor = createEntityExecutor(entity, cluster); - executors.put(new ID(entity, cluster), executor); + executors.put(new EntityClusterID(entity, cluster), executor); executor.schedule(); } } catch (FalconException e) { @@ -108,12 +109,21 @@ public final class FalconExecutionService implements FalconService, EntityStateC @Override public void onEvent(Event event) throws FalconException { // Currently, simply passes along the event to the appropriate executor - EntityExecutor executor = executors.get(event.getTarget().getEntityID()); - if (executor == null) { - // The executor has gone away, throw an exception so the notification service knows - throw new FalconException("Target executor for " + event.getTarget().getEntityID() + " does not exist."); + EntityClusterID id = null; + if (event.getTarget() instanceof EntityClusterID) { + id = (EntityClusterID) event.getTarget(); + } else if (event.getTarget() instanceof InstanceID) { + id = ((InstanceID) event.getTarget()).getEntityClusterID(); + } + + if (id != null) { + EntityExecutor executor = executors.get(id); + if (executor == null) { + // The executor has gone away, throw an exception so the notification service knows + throw new FalconException("Target executor for " + event.getTarget() + " does not exist."); + } + executor.onEvent(event); } - executor.onEvent(event); } @Override @@ -125,7 +135,7 @@ public final class FalconExecutionService implements FalconService, EntityStateC public void onSchedule(Entity entity) throws FalconException { for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { EntityExecutor executor = createEntityExecutor(entity, cluster); - ID id = new ID(entity, cluster); + EntityClusterID id = new EntityClusterID(entity, cluster); executors.put(id, executor); LOG.info("Scheduling entity {}.", id); executor.schedule(); @@ -144,7 +154,7 @@ public final class FalconExecutionService implements FalconService, EntityStateC public void onResume(Entity entity) throws FalconException { for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { EntityExecutor executor = createEntityExecutor(entity, cluster); - executors.put(new ID(entity, cluster), executor); + executors.put(new EntityClusterID(entity, cluster), executor); LOG.info("Resuming entity, {} of type {} on cluster {}.", entity.getName(), entity.getEntityType(), cluster); executor.resumeAll(); @@ -204,11 +214,11 @@ public final class FalconExecutionService implements FalconService, EntityStateC * @throws FalconException */ public EntityExecutor getEntityExecutor(Entity entity, String cluster) throws FalconException { - ID id = new ID(entity, cluster); + EntityClusterID id = new EntityClusterID(entity, cluster); if (executors.containsKey(id)) { return executors.get(id); } else { - throw new FalconException("Entity executor for entity : " + id.getEntityKey() + " does not exist."); + throw new FalconException("Entity executor for entity cluster key : " + id.getKey() + " does not exist."); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java index 8c84f2b..434f168 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java @@ -36,7 +36,7 @@ import org.apache.falcon.notification.service.event.JobCompletedEvent; import org.apache.falcon.notification.service.event.JobScheduledEvent; import org.apache.falcon.notification.service.impl.DataAvailabilityService; import org.apache.falcon.predicate.Predicate; -import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceID; import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.workflow.engine.DAGEngine; import org.apache.falcon.workflow.engine.DAGEngineFactory; @@ -60,7 +60,7 @@ public class ProcessExecutionInstance extends ExecutionInstance { private List<Predicate> awaitedPredicates = new ArrayList<Predicate>(); private DAGEngine dagEngine = null; private boolean hasTimedOut = false; - private ID id; + private InstanceID id; private int instanceSequence; private final FalconExecutionService executionService = FalconExecutionService.get(); @@ -75,7 +75,7 @@ public class ProcessExecutionInstance extends ExecutionInstance { public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster) throws FalconException { super(instanceTime, cluster); this.process = process; - this.id = new ID(process, cluster, getInstanceTime()); + this.id = new InstanceID(process, cluster, getInstanceTime()); computeInstanceSequence(); dagEngine = DAGEngineFactory.getDAGEngine(cluster); registerForNotifications(false); @@ -210,7 +210,7 @@ public class ProcessExecutionInstance extends ExecutionInstance { } @Override - public ID getId() { + public InstanceID getId() { return id; } http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java index d10d2fd..e446069 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java @@ -32,12 +32,13 @@ import org.apache.falcon.notification.service.event.Event; import org.apache.falcon.notification.service.event.EventType; import org.apache.falcon.notification.service.event.JobCompletedEvent; import org.apache.falcon.notification.service.event.TimeElapsedEvent; +import org.apache.falcon.notification.service.impl.AlarmService; import org.apache.falcon.notification.service.impl.JobCompletionService; import org.apache.falcon.notification.service.impl.SchedulerService; -import org.apache.falcon.notification.service.impl.AlarmService; import org.apache.falcon.predicate.Predicate; +import org.apache.falcon.state.EntityClusterID; import org.apache.falcon.state.EntityState; -import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceID; import org.apache.falcon.state.InstanceState; import org.apache.falcon.state.StateService; import org.apache.falcon.util.StartupProperties; @@ -58,7 +59,7 @@ import java.util.TimeZone; */ public class ProcessExecutor extends EntityExecutor { private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutor.class); - protected LoadingCache<ID, ProcessExecutionInstance> instances; + protected LoadingCache<InstanceID, ProcessExecutionInstance> instances; private Predicate triggerPredicate; private final Process process; private final StateService stateService = StateService.get(); @@ -74,7 +75,7 @@ public class ProcessExecutor extends EntityExecutor { public ProcessExecutor(Process proc, String clusterName) throws FalconException { process = proc; cluster = clusterName; - id = new ID(proc, clusterName); + id = new EntityClusterID(proc, clusterName); } @Override @@ -84,7 +85,7 @@ public class ProcessExecutor extends EntityExecutor { initInstances(); } // Check to handle restart and restoration from state store. - if (STATE_STORE.getEntity(id).getCurrentState() != EntityState.STATE.SCHEDULED) { + if (STATE_STORE.getEntity(id.getEntityID()).getCurrentState() != EntityState.STATE.SCHEDULED) { dryRun(); } else { LOG.info("Process, {} was already scheduled on cluster, {}.", process.getName(), cluster); @@ -105,9 +106,9 @@ public class ProcessExecutor extends EntityExecutor { instances = CacheBuilder.newBuilder() .maximumSize(cacheSize) - .build(new CacheLoader<ID, ProcessExecutionInstance>() { + .build(new CacheLoader<InstanceID, ProcessExecutionInstance>() { @Override - public ProcessExecutionInstance load(ID id) throws Exception { + public ProcessExecutionInstance load(InstanceID id) throws Exception { return (ProcessExecutionInstance) STATE_STORE.getExecutionInstance(id).getInstance(); } }); @@ -289,13 +290,16 @@ public class ProcessExecutor extends EntityExecutor { handleEvent(event); } else { // Else, pass it along to the execution instance - ProcessExecutionInstance instance = instances.get(event.getTarget()); - if (instance != null) { - instance.onEvent(event); - if (instance.isReady()) { - stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this); - } else if (instance.hasTimedout()) { - stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this); + if (event.getTarget() instanceof InstanceID) { + InstanceID instanceID = (InstanceID) event.getTarget(); + ProcessExecutionInstance instance = instances.get(instanceID); + if (instance != null) { + instance.onEvent(event); + if (instance.isReady()) { + stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this); + } else if (instance.hasTimedout()) { + stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this); + } } } } @@ -307,16 +311,17 @@ public class ProcessExecutor extends EntityExecutor { private void handleEvent(Event event) throws FalconException { ProcessExecutionInstance instance; + InstanceID instanceID; try { switch (event.getType()) { // TODO : Handle cases where scheduling fails. case JOB_SCHEDULED: - instance = instances.get(event.getTarget()); + instance = instances.get((InstanceID)event.getTarget()); instance.onEvent(event); stateService.handleStateChange(instance, InstanceState.EVENT.SCHEDULE, this); break; case JOB_COMPLETED: - instance = instances.get(event.getTarget()); + instance = instances.get((InstanceID)event.getTarget()); instance.onEvent(event); switch (((JobCompletedEvent) event).getStatus()) { case SUCCEEDED: @@ -387,11 +392,6 @@ public class ProcessExecutor extends EntityExecutor { triggerPredicate = Predicate.createTimePredicate(startTime.getTime(), endTime.getTime(), -1); } - @Override - public ID getId() { - return id; - } - // This executor must handle any events intended for itself. // Or, if it is job run or job complete notifications, so it can handle the instance's state transition. private boolean shouldHandleEvent(Event event) { @@ -402,7 +402,7 @@ public class ProcessExecutor extends EntityExecutor { @Override public void onTrigger(ExecutionInstance instance) throws FalconException { - instances.put(new ID(instance), (ProcessExecutionInstance) instance); + instances.put(new InstanceID(instance), (ProcessExecutionInstance) instance); } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java index 73a4199..501c6aa 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java @@ -29,6 +29,7 @@ import org.apache.falcon.notification.service.request.JobCompletionNotificationR import org.apache.falcon.notification.service.request.NotificationRequest; import org.apache.falcon.service.Services; import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceID; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.falcon.workflow.WorkflowExecutionListener; @@ -46,7 +47,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.TimeZone; /** * This notification service notifies {@link NotificationHandler} when an external job @@ -55,7 +55,7 @@ import java.util.TimeZone; public class JobCompletionService implements FalconNotificationService, WorkflowExecutionListener { private static final Logger LOG = LoggerFactory.getLogger(JobCompletionService.class); - private static DateTimeZone utc = DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC")); + private static final DateTimeZone UTC = DateTimeZone.UTC; private List<NotificationHandler> listeners = Collections.synchronizedList(new ArrayList<NotificationHandler>()); @@ -152,11 +152,12 @@ public class JobCompletionService implements FalconNotificationService, Workflow } // Constructs the callback ID from the details available in the context. - private ID constructCallbackID(WorkflowExecutionContext context) throws FalconException { - ID id = new ID(EntityType.valueOf(context.getEntityType()), context.getEntityName()); - id.setCluster(context.getClusterName()); - id.setInstanceTime(new DateTime(EntityUtil.parseDateUTC(context.getNominalTimeAsISO8601()), utc)); - return id; + private InstanceID constructCallbackID(WorkflowExecutionContext context) throws FalconException { + EntityType entityType = EntityType.valueOf(context.getEntityType()); + String entityName = context.getEntityName(); + String clusterName = context.getClusterName(); + DateTime instanceTime = new DateTime(EntityUtil.parseDateUTC(context.getNominalTimeAsISO8601()), UTC); + return new InstanceID(entityType, entityName, clusterName, instanceTime); } private WorkflowExecutionContext createContext(Properties props) { http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java index a70bc3c..ace8444 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java @@ -40,7 +40,9 @@ import org.apache.falcon.notification.service.request.JobCompletionNotificationR import org.apache.falcon.notification.service.request.JobScheduleNotificationRequest; import org.apache.falcon.notification.service.request.NotificationRequest; import org.apache.falcon.predicate.Predicate; +import org.apache.falcon.state.EntityClusterID; import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceID; import org.apache.falcon.state.InstanceState; import org.apache.falcon.state.store.AbstractStateStore; import org.apache.falcon.state.store.StateStore; @@ -81,9 +83,9 @@ public class SchedulerService implements FalconNotificationService, Notification private static final StateStore STATE_STORE = AbstractStateStore.get(); - private Cache<ID, Object> instancesToIgnore; + private Cache<InstanceID, Object> instancesToIgnore; // TODO : limit the no. of awaiting instances per entity - private LoadingCache<ID, List<ExecutionInstance>> awaitedInstances; + private LoadingCache<EntityClusterID, List<ExecutionInstance>> executorAwaitedInstances; @Override public void register(NotificationRequest notifRequest) throws NotificationServiceException { @@ -102,11 +104,10 @@ public class SchedulerService implements FalconNotificationService, Notification @Override public void unregister(NotificationHandler handler, ID listenerID) { // If ID is that of an entity, do nothing - if (listenerID.getInstanceTime() == null) { - return; + if (listenerID instanceof InstanceID) { + // Not efficient to iterate over elements to remove this. Add to ignore list. + instancesToIgnore.put((InstanceID) listenerID, new Object()); } - // Not efficient to iterate over elements to remove this. Add to ignore list. - instancesToIgnore.put(listenerID, new Object()); } @@ -129,21 +130,21 @@ public class SchedulerService implements FalconNotificationService, Notification PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new PriorityComparator()); runQueue = new ThreadPoolExecutor(1, numThreads, 0L, TimeUnit.MILLISECONDS, pq); - CacheLoader instanceCacheLoader = new CacheLoader<ID, Collection<ExecutionInstance>>() { + CacheLoader instanceCacheLoader = new CacheLoader<EntityClusterID, Collection<ExecutionInstance>>() { @Override - public Collection<ExecutionInstance> load(ID id) throws Exception { + public Collection<ExecutionInstance> load(EntityClusterID id) throws Exception { List<InstanceState.STATE> states = new ArrayList<InstanceState.STATE>(); states.add(InstanceState.STATE.READY); List<ExecutionInstance> readyInstances = new ArrayList<>(); // TODO : Limit it to no. of instances that can be run in parallel. - for (InstanceState state : STATE_STORE.getExecutionInstances(id.getEntityID(), states)) { + for (InstanceState state : STATE_STORE.getExecutionInstances(id, states)) { readyInstances.add(state.getInstance()); } return readyInstances; } }; - awaitedInstances = CacheBuilder.newBuilder() + executorAwaitedInstances = CacheBuilder.newBuilder() .maximumSize(100) .concurrencyLevel(1) .removalListener(this) @@ -182,13 +183,18 @@ public class SchedulerService implements FalconNotificationService, Notification // Interested only in job completion events. if (event.getType() == EventType.JOB_COMPLETED) { try { + ID targetID = event.getTarget(); + List<ExecutionInstance> instances = null; // Check if the instance is awaited. - ID id = event.getTarget(); - List<ExecutionInstance> instances = awaitedInstances.get(id); - // Else, check if the entity is awaited. - if (instances == null) { - id = id.getEntityID(); - instances = awaitedInstances.get(id); + if (targetID instanceof EntityClusterID) { + EntityClusterID id = (EntityClusterID) event.getTarget(); + instances = executorAwaitedInstances.get(id); + if (instances != null && instances.isEmpty()) { + executorAwaitedInstances.invalidate(id); + } + } else if (targetID instanceof InstanceID) { + InstanceID id = (InstanceID) event.getTarget(); + instances = executorAwaitedInstances.get(id.getEntityClusterID()); } if (instances != null && !instances.isEmpty()) { ExecutionInstance instance = instances.get(0); @@ -202,19 +208,12 @@ public class SchedulerService implements FalconNotificationService, Notification handler, instance.getId()); requestBuilder.setInstance(instance); InstanceRunner runner = new InstanceRunner(requestBuilder.build()); - // Since an instance just finished of the same entity just finished - if (id.equals(instance.getId())) { - runner.incrementAllowedInstances(); - } runQueue.execute(runner); instances.remove(instance); } } } } - if (instances != null && instances.isEmpty()) { - awaitedInstances.invalidate(id); - } } catch (Exception e) { throw new FalconException(e); } @@ -304,11 +303,11 @@ public class SchedulerService implements FalconNotificationService, Notification if (instanceCheck() && dependencyCheck()) { return true; } else { - ID entityID = instance.getId().getEntityID(); + EntityClusterID entityID = instance.getId().getEntityClusterID(); // Instance is awaiting scheduling conditions to be met. Add predicate to that effect. instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(request.getHandler(), entityID)); - updateAwaitedInstances(entityID); + updateExecutorAwaitedInstances(entityID); LOG.debug("Schedule conditions not met for instance {}. Awaiting on {}", instance.getId(), entityID); } @@ -319,13 +318,13 @@ public class SchedulerService implements FalconNotificationService, Notification return false; } - private void updateAwaitedInstances(ID id) throws ExecutionException { + private void updateExecutorAwaitedInstances(EntityClusterID id) throws ExecutionException { synchronized (id) { - List<ExecutionInstance> instances = awaitedInstances.get(id); + List<ExecutionInstance> instances = executorAwaitedInstances.get(id); if (instances == null) { // Order is FIFO. instances = new LinkedList<>(); - awaitedInstances.put(id, instances); + executorAwaitedInstances.put(id, instances); } instances.add(instance); } @@ -340,7 +339,7 @@ public class SchedulerService implements FalconNotificationService, Notification // Dependants should wait for this instance to complete. Add predicate to that effect. instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate( request.getHandler(), execInstance.getId())); - updateAwaitedInstances(execInstance.getId()); + updateExecutorAwaitedInstances(execInstance.getId().getEntityClusterID()); } return false; } http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/EntityClusterID.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityClusterID.java b/scheduler/src/main/java/org/apache/falcon/state/EntityClusterID.java new file mode 100644 index 0000000..b25a547 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/EntityClusterID.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.state; + +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; + +/** + * A unique ID for an entity, cluster pair. + * This is required in scenarios like scheduling an entity per cluster. + */ +public class EntityClusterID extends ID { + + private final String clusterName; + + public EntityClusterID(Entity entity, String clusterName) { + this(entity.getEntityType(), entity.getName(), clusterName); + } + + public EntityClusterID(EntityType entityType, String entityName, String clusterName) { + this.entityName = entityName; + this.entityType = entityType; + this.clusterName = clusterName; + this.key = this.entityType + KEY_SEPARATOR + this.entityName + KEY_SEPARATOR + clusterName; + } + + public String getClusterName() { + return clusterName; + } + + @Override + public EntityID getEntityID() { + return new EntityID(entityType, entityName); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/EntityID.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityID.java b/scheduler/src/main/java/org/apache/falcon/state/EntityID.java new file mode 100644 index 0000000..cf37986 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/EntityID.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.state; + +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.datanucleus.util.StringUtils; + +/** + * A unique id for an entity(wrapped). + */ +public class EntityID extends ID { + + public EntityID(EntityType entityType, String entityName) { + assert entityType != null : "Entity type must be present."; + assert !StringUtils.isEmpty(entityName) : "Entity name must be present."; + this.entityName = entityName; + this.entityType = entityType; + this.key = this.entityType + KEY_SEPARATOR + this.entityName; + } + + public EntityID(Entity entity) { + this(entity.getEntityType(), entity.getName()); + } + + @Override + public String toString() { + return key; + } + + @Override + public EntityID getEntityID() { + return this; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/ID.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/ID.java b/scheduler/src/main/java/org/apache/falcon/state/ID.java index 420c856..e93dbd3 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/ID.java +++ b/scheduler/src/main/java/org/apache/falcon/state/ID.java @@ -17,172 +17,30 @@ */ package org.apache.falcon.state; -import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.execution.ExecutionInstance; -import org.datanucleus.util.StringUtils; -import org.joda.time.DateTime; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; import java.io.Serializable; /** * A serializable, comparable ID used to uniquely represent an entity or an instance. */ -public final class ID implements Serializable, Comparable<ID> { +public abstract class ID implements Serializable, Comparable<ID> { public static final String KEY_SEPARATOR = "/"; - public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; - - private String entityName; - private EntityType entityType; - private String entityKey; - private String cluster; - private DateTime instanceTime; - - /** - * Default Constructor. - */ - public ID(){} - - /** - * Constructor. - * - * @param type - * @param name - */ - public ID(EntityType type, String name) { - assert type != null : "Entity type must be present."; - assert !StringUtils.isEmpty(name) : "Entity name must be present."; - this.entityName = name; - this.entityType = type; - this.entityKey = entityType + KEY_SEPARATOR + entityName; - } - - /** - * Constructor. - * - * @param entity - */ - public ID(Entity entity) { - this(entity.getEntityType(), entity.getName()); - } - - /** - * Constructor. - * - * @param entity - * @param cluster - */ - public ID(Entity entity, String cluster) { - this(entity.getEntityType(), entity.getName()); - assert !StringUtils.isEmpty(cluster) : "Cluster cannot be empty."; - this.cluster = cluster; - } - - /** - * Constructor. - * - * @param instance - */ - public ID(ExecutionInstance instance) { - this(instance.getEntity(), instance.getCluster()); - assert instance.getInstanceTime() != null : "Nominal time cannot be null."; - this.instanceTime = instance.getInstanceTime(); - } - - /** - * Constructor. - * - * @param entity - * @param cluster - * @param instanceTime - */ - public ID(Entity entity, String cluster, DateTime instanceTime) { - this(entity, cluster); - assert instanceTime != null : "Nominal time cannot be null."; - this.instanceTime = instanceTime; - } - - /** - * @return cluster name - */ - public String getCluster() { - return cluster; - } - - /** - * @param cluster name - */ - public void setCluster(String cluster) { - this.cluster = cluster; - } - - /** - * @return nominal time - */ - public DateTime getInstanceTime() { - return instanceTime; - } - - /** - * @param instanceTime - */ - public void setInstanceTime(DateTime instanceTime) { - this.instanceTime = instanceTime; - } - - /** - * @return entity name - */ - public String getEntityName() { - return entityName; - } - - /** - * @return entity type - */ - public EntityType getEntityType() { - return entityType; - } - - @Override - public String toString() { - String val = entityKey; - if (!StringUtils.isEmpty(cluster)) { - val = val + KEY_SEPARATOR + cluster; - } - - if (instanceTime != null) { - DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT); - val = val + KEY_SEPARATOR + fmt.print(instanceTime); - } - return val; - } - - /** - * @return An ID without the cluster name - */ - public String getEntityKey() { - return entityKey; - } - - /** - * @return ID without the instance information - */ - public ID getEntityID() { - ID newID = new ID(this.entityType, this.entityName); - newID.setCluster(this.cluster); - newID.setInstanceTime(null); - return newID; - } + protected String entityName; + protected EntityType entityType; + protected String key; @Override public boolean equals(Object id) { if (id == null || id.getClass() != getClass()) { return false; } - return compareTo((ID)id) == 0; + return compareTo((ID) id) == 0; + } + + @Override + public String toString() { + return key; } @Override @@ -197,4 +55,19 @@ public final class ID implements Serializable, Comparable<ID> { } return this.toString().compareTo(id.toString()); } + + public String getEntityName() { + return entityName; + } + + public EntityType getEntityType() { + return entityType; + } + + public String getKey() { + return key; + } + + public abstract EntityID getEntityID(); + } http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java new file mode 100644 index 0000000..a722be9 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.state; + +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.execution.ExecutionInstance; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +/** + * A unique ID for a given(wrapped) instance. + * An instance is the execution unit of an entity and can be uniquely defined by + * (entity, cluster, instanceTime). + */ +public class InstanceID extends ID { + public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; + + /** + * Name of the cluster for the instance. + */ + private String clusterName; + + /** + * + */ + private DateTime instanceTime; + + public InstanceID(EntityType entityType, String entityName, String clusterName, DateTime instanceTime) { + this.entityType = entityType; + this.entityName = entityName; + this.clusterName = clusterName; + this.instanceTime = new DateTime(instanceTime); + DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT); + this.key = this.entityType + KEY_SEPARATOR + this.entityName + KEY_SEPARATOR + this.clusterName + + KEY_SEPARATOR + fmt.print(instanceTime); + } + + public InstanceID(Entity entity, String clusterName, DateTime instanceTime) { + this(entity.getEntityType(), entity.getName(), clusterName, instanceTime); + } + + + public InstanceID(ExecutionInstance instance) { + this(instance.getEntity(), instance.getCluster(), instance.getInstanceTime()); + assert instance.getInstanceTime() != null : "Instance time cannot be null."; + } + + public String getClusterName() { + return clusterName; + } + + public DateTime getInstanceTime() { + return instanceTime; + } + + @Override + public EntityID getEntityID() { + return new EntityID(entityType, entityName); + } + + + public EntityClusterID getEntityClusterID() { + return new EntityClusterID(entityType, entityName, clusterName); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java index 8cf24ee..ada9d2b 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java +++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java @@ -247,4 +247,9 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance states.add(STATE.SUCCEEDED); return states; } + + @Override + public String toString() { + return instance.getId().toString() + "STATE: " + currentState.toString(); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/StateService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java index 81357a4..c1671ac 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java +++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java @@ -64,7 +64,7 @@ public final class StateService { */ public void handleStateChange(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler) throws FalconException { - ID id = new ID(entity); + EntityID id = new EntityID(entity); if (!stateStore.entityExists(id)) { // New entity if (event == EntityState.EVENT.SUBMIT) { @@ -122,7 +122,7 @@ public final class StateService { */ public void handleStateChange(ExecutionInstance instance, InstanceState.EVENT event, InstanceStateChangeHandler handler) throws FalconException { - ID id = new ID(instance); + InstanceID id = new InstanceID(instance); if (!stateStore.executionInstanceExists(id)) { // New instance if (event == InstanceState.EVENT.TRIGGER) { http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java index ba3d5fd..e36f85c 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java @@ -21,8 +21,8 @@ import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.service.ConfigurationChangeListener; +import org.apache.falcon.state.EntityID; import org.apache.falcon.state.EntityState; -import org.apache.falcon.state.ID; import org.apache.falcon.util.ReflectionUtils; import org.apache.falcon.util.StartupProperties; import org.slf4j.Logger; @@ -46,14 +46,14 @@ public abstract class AbstractStateStore implements StateStore, ConfigurationCha public void onRemove(Entity entity) throws FalconException { // Delete entity should remove its instances too. if (entity.getEntityType() != EntityType.CLUSTER) { - deleteEntity(new ID(entity)); + deleteEntity(new EntityID(entity)); } } @Override public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { if (newEntity.getEntityType() != EntityType.CLUSTER) { - EntityState entityState = getEntity(new ID(oldEntity)); + EntityState entityState = getEntity(new EntityID(oldEntity)); if (entityState == null) { onAdd(newEntity); } else { @@ -67,7 +67,7 @@ public abstract class AbstractStateStore implements StateStore, ConfigurationCha public void onReload(Entity entity) throws FalconException { if (entity.getEntityType() != EntityType.CLUSTER) { // To ensure the config store and state store are in sync - if (!entityExists(new ID(entity))) { + if (!entityExists(new EntityID(entity))) { LOG.info("State store missing entity {}. Adding it.", entity.getName()); onAdd(entity); } http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java index 4aa6fdb..113f4c5 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java @@ -19,8 +19,8 @@ package org.apache.falcon.state.store; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.exception.StateStoreException; +import org.apache.falcon.state.EntityID; import org.apache.falcon.state.EntityState; -import org.apache.falcon.state.ID; import java.util.Collection; @@ -39,13 +39,13 @@ public interface EntityStateStore { * @return Entity corresponding to the key * @throws StateStoreException - If entity does not exist. */ - EntityState getEntity(ID entityId) throws StateStoreException; + EntityState getEntity(EntityID entityId) throws StateStoreException; /** * @param entityId * @return true, if entity exists in store. */ - boolean entityExists(ID entityId); + boolean entityExists(EntityID entityId); /** * @param state @@ -72,5 +72,5 @@ public interface EntityStateStore { * @param entityId * @throws StateStoreException */ - void deleteEntity(ID entityId) throws StateStoreException; + void deleteEntity(EntityID entityId) throws StateStoreException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java index 64c5a59..52b3bb8 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java @@ -21,8 +21,10 @@ import com.google.common.collect.Lists; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.state.EntityClusterID; +import org.apache.falcon.state.EntityID; import org.apache.falcon.state.EntityState; -import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceID; import org.apache.falcon.state.InstanceState; import org.joda.time.DateTime; @@ -57,7 +59,7 @@ public final class InMemoryStateStore extends AbstractStateStore { @Override public void putEntity(EntityState entityState) throws StateStoreException { - String key = new ID(entityState.getEntity()).getEntityKey(); + String key = new EntityID(entityState.getEntity()).getKey(); if (entityStates.containsKey(key)) { throw new StateStoreException("Entity with key, " + key + " already exists."); } @@ -65,16 +67,16 @@ public final class InMemoryStateStore extends AbstractStateStore { } @Override - public EntityState getEntity(ID entityId) throws StateStoreException { - if (!entityStates.containsKey(entityId.getEntityKey())) { + public EntityState getEntity(EntityID entityId) throws StateStoreException { + if (!entityStates.containsKey(entityId.getKey())) { throw new StateStoreException("Entity with key, " + entityId + " does not exist."); } - return entityStates.get(entityId.getEntityKey()); + return entityStates.get(entityId.getKey()); } @Override - public boolean entityExists(ID entityId) { - return entityStates.containsKey(entityId.getEntityKey()); + public boolean entityExists(EntityID entityId) { + return entityStates.containsKey(entityId.getKey()); } @Override @@ -95,7 +97,7 @@ public final class InMemoryStateStore extends AbstractStateStore { @Override public void updateEntity(EntityState entityState) throws StateStoreException { - String key = new ID(entityState.getEntity()).getEntityKey(); + String key = new EntityID(entityState.getEntity()).getKey(); if (!entityStates.containsKey(key)) { throw new StateStoreException("Entity with key, " + key + " does not exist."); } @@ -103,17 +105,17 @@ public final class InMemoryStateStore extends AbstractStateStore { } @Override - public void deleteEntity(ID entityId) throws StateStoreException { - if (!entityStates.containsKey(entityId.getEntityKey())) { + public void deleteEntity(EntityID entityId) throws StateStoreException { + if (!entityStates.containsKey(entityId.getKey())) { throw new StateStoreException("Entity with key, " + entityId + " does not exist."); } deleteExecutionInstances(entityId); - entityStates.remove(entityId.getEntityKey()); + entityStates.remove(entityId.getKey()); } @Override public void putExecutionInstance(InstanceState instanceState) throws StateStoreException { - String key = new ID(instanceState.getInstance()).toString(); + String key = new InstanceID(instanceState.getInstance()).getKey(); if (instanceStates.containsKey(key)) { throw new StateStoreException("Instance with key, " + key + " already exists."); } @@ -121,8 +123,8 @@ public final class InMemoryStateStore extends AbstractStateStore { } @Override - public InstanceState getExecutionInstance(ID instanceId) throws StateStoreException { - if (!instanceStates.containsKey(instanceId.toString())) { + public InstanceState getExecutionInstance(InstanceID instanceId) throws StateStoreException { + if (!instanceStates.containsKey(instanceId.getKey())) { throw new StateStoreException("Instance with key, " + instanceId + " does not exist."); } return instanceStates.get(instanceId.toString()); @@ -130,7 +132,7 @@ public final class InMemoryStateStore extends AbstractStateStore { @Override public void updateExecutionInstance(InstanceState instanceState) throws StateStoreException { - String key = new ID(instanceState.getInstance()).toString(); + String key = new InstanceID(instanceState.getInstance()).getKey(); if (!instanceStates.containsKey(key)) { throw new StateStoreException("Instance with key, " + key + " does not exist."); } @@ -140,11 +142,11 @@ public final class InMemoryStateStore extends AbstractStateStore { @Override public Collection<InstanceState> getAllExecutionInstances(Entity entity, String cluster) throws StateStoreException { - ID id = new ID(entity, cluster); - if (!entityStates.containsKey(id.getEntityKey())) { - throw new StateStoreException("Entity with key, " + id.getEntityKey() + " does not exist."); + EntityClusterID id = new EntityClusterID(entity, cluster); + if (!entityStates.containsKey(id.getEntityID().getKey())) { + throw new StateStoreException("Entity with key, " + id.getEntityID().getKey() + " does not exist."); } - Collection<InstanceState> instances = new ArrayList<InstanceState>(); + Collection<InstanceState> instances = new ArrayList<>(); for (Map.Entry<String, InstanceState> instanceState : instanceStates.entrySet()) { if (instanceState.getKey().startsWith(id.toString())) { instances.add(instanceState.getValue()); @@ -156,7 +158,7 @@ public final class InMemoryStateStore extends AbstractStateStore { @Override public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster, Collection<InstanceState.STATE> states) throws StateStoreException { - ID id = new ID(entity, cluster); + EntityClusterID id = new EntityClusterID(entity, cluster); return getExecutionInstances(id, states); } @@ -164,7 +166,7 @@ public final class InMemoryStateStore extends AbstractStateStore { public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster, Collection<InstanceState.STATE> states, DateTime start, DateTime end) throws StateStoreException { List<InstanceState> instancesToReturn = new ArrayList<>(); - ID id = new ID(entity, cluster); + EntityClusterID id = new EntityClusterID(entity, cluster); for (InstanceState state : getExecutionInstances(id, states)) { ExecutionInstance instance = state.getInstance(); DateTime instanceTime = instance.getInstanceTime(); @@ -179,9 +181,9 @@ public final class InMemoryStateStore extends AbstractStateStore { } @Override - public Collection<InstanceState> getExecutionInstances(ID entityId, Collection<InstanceState.STATE> states) - throws StateStoreException { - Collection<InstanceState> instances = new ArrayList<InstanceState>(); + public Collection<InstanceState> getExecutionInstances(EntityClusterID entityId, + Collection<InstanceState.STATE> states) throws StateStoreException { + Collection<InstanceState> instances = new ArrayList<>(); for (Map.Entry<String, InstanceState> instanceState : instanceStates.entrySet()) { if (instanceState.getKey().startsWith(entityId.toString()) && states.contains(instanceState.getValue().getCurrentState())) { @@ -193,9 +195,9 @@ public final class InMemoryStateStore extends AbstractStateStore { @Override public InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException { - ID id = new ID(entity, cluster); - if (!entityStates.containsKey(id.getEntityKey())) { - throw new StateStoreException("Entity with key, " + id.getEntityKey() + " does not exist."); + EntityClusterID id = new EntityClusterID(entity, cluster); + if (!entityStates.containsKey(id.getEntityID().getKey())) { + throw new StateStoreException("Entity with key, " + id.getEntityID().getKey() + " does not exist."); } InstanceState latestState = null; // TODO : Very crude. Iterating over all entries and getting the last one. @@ -208,14 +210,14 @@ public final class InMemoryStateStore extends AbstractStateStore { } @Override - public boolean executionInstanceExists(ID instanceId) { + public boolean executionInstanceExists(InstanceID instanceId) { return instanceStates.containsKey(instanceId.toString()); } @Override - public void deleteExecutionInstances(ID entityId) { + public void deleteExecutionInstances(EntityID entityId) { for (String instanceKey : Lists.newArrayList(instanceStates.keySet())) { - if (instanceKey.startsWith(entityId.getEntityKey())) { + if (instanceKey.startsWith(entityId.getKey())) { instanceStates.remove(instanceKey); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java index d6a4b49..483d9e6 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java @@ -19,7 +19,9 @@ package org.apache.falcon.state.store; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.exception.StateStoreException; -import org.apache.falcon.state.ID; +import org.apache.falcon.state.EntityClusterID; +import org.apache.falcon.state.EntityID; +import org.apache.falcon.state.InstanceID; import org.apache.falcon.state.InstanceState; import org.joda.time.DateTime; @@ -43,7 +45,7 @@ public interface InstanceStateStore { * @return Execution instance corresponding to the name. * @throws StateStoreException - When instance does not exist */ - InstanceState getExecutionInstance(ID instanceId) throws StateStoreException; + InstanceState getExecutionInstance(InstanceID instanceId) throws StateStoreException; /** * Updates an execution instance in the store. @@ -83,13 +85,13 @@ public interface InstanceStateStore { DateTime start, DateTime end) throws StateStoreException; /** - * @param entityId + * @param entityClusterID * @param states * @return - All execution instance for an given entityKey (that includes the cluster name) * @throws StateStoreException */ - Collection<InstanceState> getExecutionInstances(ID entityId, Collection<InstanceState.STATE> states) - throws StateStoreException; + Collection<InstanceState> getExecutionInstances(EntityClusterID entityClusterID, + Collection<InstanceState.STATE> states) throws StateStoreException; /** * @param entity * @param cluster @@ -102,12 +104,12 @@ public interface InstanceStateStore { * @param instanceId * @return true, if instance exists. */ - boolean executionInstanceExists(ID instanceId); + boolean executionInstanceExists(InstanceID instanceId); /** * Delete instances of a given entity. * * @param entityId */ - void deleteExecutionInstances(ID entityId); + void deleteExecutionInstances(EntityID entityId); } http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index 3a5024a..d7d157f 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -32,8 +32,8 @@ import org.apache.falcon.execution.FalconExecutionService; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; +import org.apache.falcon.state.EntityID; import org.apache.falcon.state.EntityState; -import org.apache.falcon.state.ID; import org.apache.falcon.state.InstanceState; import org.apache.falcon.state.store.AbstractStateStore; import org.apache.falcon.state.store.StateStore; @@ -86,12 +86,12 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { @Override public boolean isActive(Entity entity) throws FalconException { - return STATE_STORE.getEntity(new ID(entity)).getCurrentState() != EntityState.STATE.SUBMITTED; + return STATE_STORE.getEntity(new EntityID(entity)).getCurrentState() != EntityState.STATE.SUBMITTED; } @Override public boolean isSuspended(Entity entity) throws FalconException { - return STATE_STORE.getEntity(new ID(entity)) + return STATE_STORE.getEntity(new EntityID(entity)) .getCurrentState().equals(EntityState.STATE.SUSPENDED); } http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java index b2f9e59..bff92c9 100644 --- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java @@ -36,8 +36,11 @@ import org.apache.falcon.notification.service.impl.DataAvailabilityService; import org.apache.falcon.notification.service.impl.JobCompletionService; import org.apache.falcon.notification.service.impl.SchedulerService; import org.apache.falcon.service.Services; +import org.apache.falcon.state.EntityClusterID; +import org.apache.falcon.state.EntityID; import org.apache.falcon.state.EntityState; import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceID; import org.apache.falcon.state.InstanceState; import org.apache.falcon.state.store.AbstractStateStore; import org.apache.falcon.state.store.InMemoryStateStore; @@ -135,7 +138,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase { storeEntity(EntityType.PROCESS, "summarize1"); Process process = getStore().get(EntityType.PROCESS, "summarize1"); Assert.assertNotNull(process); - ID processKey = new ID(process); + EntityID processKey = new EntityID(process); String clusterName = dfsCluster.getCluster().getName(); // Schedule a process @@ -183,10 +186,12 @@ public class FalconExecutionServiceTest extends AbstractTestBase { Process process = getStore().get(EntityType.PROCESS, "summarize2"); Assert.assertNotNull(process); String clusterName = dfsCluster.getCluster().getName(); - ID processID = new ID(process, clusterName); + EntityID processID = new EntityID(process); + EntityClusterID executorID = new EntityClusterID(process, clusterName); // Schedule a process - Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED); + Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), + EntityState.STATE.SUBMITTED); FalconExecutionService.get().schedule(process); // Simulate two time notifications @@ -218,7 +223,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase { instance1.getInstance().getId()); Mockito.verify(mockDataService).unregister(FalconExecutionService.get(), instance2.getInstance().getId()); - Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), processID); + Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), executorID); FalconExecutionService.get().resume(process); Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY); @@ -260,8 +265,8 @@ public class FalconExecutionServiceTest extends AbstractTestBase { Process process = getStore().get(EntityType.PROCESS, "summarize4"); Assert.assertNotNull(process); String clusterName = dfsCluster.getCluster().getName(); - ID processID = new ID(process, clusterName); - + EntityID processID = new EntityID(process); + EntityClusterID executorID = new EntityClusterID(process, clusterName); // Schedule a process Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED); FalconExecutionService.get().schedule(process); @@ -296,7 +301,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase { FalconExecutionService.get().delete(process); // Deregister from notification services - Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), processID); + Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), executorID); } @Test @@ -305,7 +310,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase { Process process = getStore().get(EntityType.PROCESS, "summarize3"); Assert.assertNotNull(process); String clusterName = dfsCluster.getCluster().getName(); - ID processID = new ID(process, clusterName); + EntityID processID = new EntityID(process); // Schedule a process Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED); @@ -334,7 +339,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase { Process process = getStore().get(EntityType.PROCESS, "summarize6"); Assert.assertNotNull(process); String clusterName = dfsCluster.getCluster().getName(); - ID processID = new ID(process, clusterName); + EntityID processID = new EntityID(process); // Schedule a process Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED); @@ -356,7 +361,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase { Process process = getStore().get(EntityType.PROCESS, "summarize5"); Assert.assertNotNull(process); String clusterName = dfsCluster.getCluster().getName(); - ID processID = new ID(process, clusterName); + EntityID processID = new EntityID(process); // Schedule a process Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED); @@ -434,7 +439,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase { Process process = getStore().get(EntityType.PROCESS, "summarize7"); Assert.assertNotNull(process); String clusterName = dfsCluster.getCluster().getName(); - ID processID = new ID(process, clusterName); + EntityID processID = new EntityID(process); // Store couple of instances in store stateStore.getEntity(processID).setCurrentState(EntityState.STATE.SCHEDULED); @@ -468,7 +473,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase { Process process = getStore().get(EntityType.PROCESS, name); Assert.assertNotNull(process); String clusterName = dfsCluster.getCluster().getName(); - ID processID = new ID(process, clusterName); + EntityID processID = new EntityID(process); // Schedule the process FalconExecutionService.get().schedule(process); @@ -517,7 +522,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase { } private Event createEvent(NotificationServicesRegistry.SERVICE type, Process process, String cluster) { - ID id = new ID(process, cluster); + EntityClusterID id = new EntityClusterID(process, cluster); switch (type) { case TIME: Date start = process.getClusters().getClusters().get(0).getValidity().getStart(); @@ -536,7 +541,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase { } private Event createEvent(NotificationServicesRegistry.SERVICE type, ExecutionInstance instance) { - ID id = new ID(instance); + ID id = new InstanceID(instance); switch (type) { case DATA: DataEvent dataEvent = new DataEvent(id, new Path("/projects/falcon/clicks"), LocationType.DATA, http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java index 36f1fd1..34965f2 100644 --- a/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java @@ -23,7 +23,7 @@ import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.execution.NotificationHandler; import org.apache.falcon.notification.service.event.Event; import org.apache.falcon.notification.service.impl.AlarmService; -import org.apache.falcon.state.ID; +import org.apache.falcon.state.EntityID; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.mockito.Mockito; @@ -57,8 +57,8 @@ public class AlarmServiceTest { Process mockProcess = new Process(); mockProcess.setName("test"); - ID id = new ID(mockProcess); - id.setCluster("testCluster"); + EntityID id = new EntityID(mockProcess); +// id.setCluster("testCluster"); AlarmService.AlarmRequestBuilder request = new AlarmService.AlarmRequestBuilder(handler, id); http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java index b4a0f35..001f466 100644 --- a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java @@ -35,7 +35,9 @@ import org.apache.falcon.notification.service.impl.DataAvailabilityService; import org.apache.falcon.notification.service.impl.JobCompletionService; import org.apache.falcon.notification.service.impl.SchedulerService; import org.apache.falcon.service.Services; +import org.apache.falcon.state.EntityClusterID; import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceID; import org.apache.falcon.state.InstanceState; import org.apache.falcon.state.store.AbstractStateStore; import org.apache.falcon.state.store.InMemoryStateStore; @@ -147,16 +149,16 @@ public class SchedulerServiceTest extends AbstractTestBase { Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1)); // Simulate the completion of previous instance. stateStore.getExecutionInstance(instance1.getId()).setCurrentState(STATE.SUCCEEDED); - scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED, + scheduler.onEvent(new JobCompletedEvent(new EntityClusterID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED, DateTime.now())); // When an instance completes instance2 should get scheduled next iteration - Thread.sleep(100); + Thread.sleep(300); Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1)); // Simulate another completion and ensure instance3 runs. stateStore.getExecutionInstance(instance2.getId()).setCurrentState(STATE.SUCCEEDED); - scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED, + scheduler.onEvent(new JobCompletedEvent(new EntityClusterID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED, DateTime.now())); - Thread.sleep(100); + Thread.sleep(300); Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), new Integer(1)); } @@ -193,7 +195,7 @@ public class SchedulerServiceTest extends AbstractTestBase { Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1)); Mockito.verify(handler, Mockito.times(1)).onEvent(Mockito.any(JobScheduledEvent.class)); stateStore.getExecutionInstance(instance2.getId()).setCurrentState(STATE.SUCCEEDED); - scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster, instance2.getInstanceTime()), + scheduler.onEvent(new JobCompletedEvent(new InstanceID(mockProcess, cluster, instance2.getInstanceTime()), WorkflowJob.Status.SUCCEEDED, DateTime.now())); // Dependency now satisfied. Now, the first instance should get scheduled after retry delay. Thread.sleep(100); @@ -300,8 +302,9 @@ public class SchedulerServiceTest extends AbstractTestBase { JobScheduledEvent scheduledEvent = ((JobScheduledEvent) event); Process p = (Process) process.copy(); p.setName(scheduledEvent.getTarget().getEntityName()); - ProcessExecutionInstance instance = new ProcessExecutionInstance(p, - scheduledEvent.getTarget().getInstanceTime(), cluster); + InstanceID instanceID = (InstanceID) scheduledEvent.getTarget(); + DateTime instanceTime = new DateTime(instanceID.getInstanceTime()); + ProcessExecutionInstance instance = new ProcessExecutionInstance(p, instanceTime, cluster); InstanceState state = new InstanceState(instance).setCurrentState(STATE.RUNNING); if (!stateStore.executionInstanceExists(instance.getId())) { stateStore.putExecutionInstance(state); http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java b/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java index 95dd5ae..073f73e 100644 --- a/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java +++ b/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java @@ -18,12 +18,12 @@ package org.apache.falcon.predicate; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.notification.service.event.TimeElapsedEvent; -import org.apache.falcon.state.ID; +import org.apache.falcon.state.EntityID; import org.joda.time.DateTime; import org.testng.Assert; import org.testng.annotations.Test; -import org.apache.falcon.entity.v0.process.Process; /** * Tests the predicate class. @@ -35,7 +35,7 @@ public class PredicateTest { Process process = new Process(); process.setName("test"); DateTime now = DateTime.now(); - TimeElapsedEvent te = new TimeElapsedEvent(new ID(process), now, now, now); + TimeElapsedEvent te = new TimeElapsedEvent(new EntityID(process), now, now, now); Predicate.getPredicate(te); } http://git-wip-us.apache.org/repos/asf/falcon/blob/c4958773/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java index d27ac7e..43c3c54 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java @@ -62,7 +62,7 @@ public class InstanceStateServiceTest { public void testLifeCycle() throws FalconException { StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.TRIGGER, listener); InstanceState instanceFromStore = AbstractStateStore.get() - .getExecutionInstance(new ID(mockInstance)); + .getExecutionInstance(new InstanceID(mockInstance)); Mockito.verify(listener).onTrigger(mockInstance); Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.WAITING)); StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.CONDITIONS_MET, listener);
