Repository: falcon Updated Branches: refs/heads/master 7e554e791 -> 3251e3aa1
FALCON-1826 Execution order not honoured when instances are KILLED The problem was that the unregister for schedule did not unregister the instance immediately. It would add to instancesToIgnore and would get ignored while scheduling. This causes issues as instancesToIgnore will need to be scanned at more than one place. The fix is to remove the instance from "awaitingSchedule" list synchronously for unregister. Author: Pallavi Rao <[email protected]> Reviewers: "Pavan Kumar Kolamuri <[email protected]>" Closes #58 from pallavi-rao/1826 and squashes the following commits: 9ace53d [Pallavi Rao] FALCON-1826 Addressed review comments dec29c0 [Pallavi Rao] FALCON-1826 Execution order not honoured when instances are KILLED Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3251e3aa Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3251e3aa Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3251e3aa Branch: refs/heads/master Commit: 3251e3aa17d1f6f15e49e6d2c7c4e75c7260b387 Parents: 7e554e7 Author: Pallavi Rao <[email protected]> Authored: Fri Mar 4 12:18:26 2016 +0530 Committer: pavankumar526 <[email protected]> Committed: Fri Mar 4 12:18:26 2016 +0530 ---------------------------------------------------------------------- .../service/impl/SchedulerService.java | 35 ++++++++------------ .../service/SchedulerServiceTest.java | 11 +++--- 2 files changed, 20 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/3251e3aa/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java index f5a7c86..635fec4 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java @@ -17,7 +17,6 @@ */ package org.apache.falcon.notification.service.impl; -import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -86,7 +85,6 @@ public class SchedulerService implements FalconNotificationService, Notification private static final StateStore STATE_STORE = AbstractStateStore.get(); - private Cache<InstanceID, Object> instancesToIgnore; // TODO : limit the no. of awaiting instances per entity private LoadingCache<EntityClusterID, SortedMap<Integer, ExecutionInstance>> executorAwaitedInstances; @@ -96,22 +94,28 @@ public class SchedulerService implements FalconNotificationService, Notification if (request.getInstance() == null) { throw new NotificationServiceException("Request must contain an instance."); } - // When the instance is getting rescheduled for run. As in the case of suspend and resume. - Object obj = instancesToIgnore.getIfPresent(request.getInstance().getId()); - if (obj != null) { - instancesToIgnore.invalidate(request.getInstance().getId()); - } LOG.debug("Received request to schedule instance {} with sequence {}.", request.getInstance().getId(), request.getInstance().getInstanceSequence()); runQueue.execute(new InstanceRunner(request)); } @Override - public void unregister(NotificationHandler handler, ID listenerID) { + public void unregister(NotificationHandler handler, ID listenerID) throws NotificationServiceException { // If ID is that of an entity, do nothing if (listenerID instanceof InstanceID) { - // Not efficient to iterate over elements to remove this. Add to ignore list. - instancesToIgnore.put((InstanceID) listenerID, new Object()); + try { + InstanceID instanceID = (InstanceID) listenerID; + SortedMap<Integer, ExecutionInstance> instances = executorAwaitedInstances.get(instanceID + .getEntityClusterID()); + if (instances != null && !instances.isEmpty()) { + synchronized (instances) { + instances.remove(STATE_STORE.getExecutionInstance(instanceID) + .getInstance().getInstanceSequence()); + } + } + } catch (Exception e) { + throw new NotificationServiceException(e); + } } } @@ -155,10 +159,6 @@ public class SchedulerService implements FalconNotificationService, Notification .removalListener(this) .build(instanceCacheLoader); - instancesToIgnore = CacheBuilder.newBuilder() - .expireAfterWrite(1, TimeUnit.HOURS) - .concurrencyLevel(1) - .build(); // Interested in all job completion events. JobCompletionNotificationRequest completionRequest = (JobCompletionNotificationRequest) NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_COMPLETION) @@ -243,7 +243,6 @@ public class SchedulerService implements FalconNotificationService, Notification @Override public void destroy() throws FalconException { runQueue.shutdownNow(); - instancesToIgnore.invalidateAll(); } private void notifyFailureEvent(JobScheduleNotificationRequest request) throws FalconException { @@ -290,12 +289,6 @@ public class SchedulerService implements FalconNotificationService, Notification @Override public void run() { try { - // If de-registered - if (instancesToIgnore.getIfPresent(instance.getId()) != null) { - LOG.debug("Instance {} has been deregistered. Ignoring.", instance.getId()); - instancesToIgnore.invalidate(instance.getId()); - return; - } LOG.debug("Received request to run instance {}", instance.getId()); if (checkConditions()) { String externalId = instance.getExternalID(); http://git-wip-us.apache.org/repos/asf/falcon/blob/3251e3aa/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java index a7ce748..a442738 100644 --- a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java @@ -242,7 +242,7 @@ public class SchedulerServiceTest extends AbstractTestBase { public void testDeRegistration() throws Exception { storeEntity(EntityType.PROCESS, "summarize4"); Process mockProcess = getStore().get(EntityType.PROCESS, "summarize4"); - mockProcess.setParallel(3); + mockProcess.setParallel(2); Date startTime = EntityUtil.getStartTime(mockProcess, cluster); ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster); // Schedule 3 instances. @@ -263,14 +263,15 @@ public class SchedulerServiceTest extends AbstractTestBase { request3.setInstance(instance3); scheduler.register(request3.build()); - // Abort second instance - scheduler.unregister(handler, instance2.getId()); + // Abort third instance + stateStore.putExecutionInstance(new InstanceState(instance3)); + scheduler.unregister(handler, instance3.getId()); Thread.sleep(100); Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1)); - Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), new Integer(1)); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1)); // Second instance should not run. - Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), null); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), null); } @Test
