Repository: falcon
Updated Branches:
  refs/heads/master 7e554e791 -> 3251e3aa1


FALCON-1826 Execution order not honoured when instances are KILLED

The problem was that the unregister for schedule did not unregister the 
instance immediately. It would add to instancesToIgnore and would get ignored 
while scheduling. This causes issues as instancesToIgnore will need to be 
scanned at more than one place.

The fix is to remove the instance from "awaitingSchedule" list synchronously 
for unregister.

Author: Pallavi Rao <[email protected]>

Reviewers: "Pavan Kumar Kolamuri <[email protected]>"

Closes #58 from pallavi-rao/1826 and squashes the following commits:

9ace53d [Pallavi Rao] FALCON-1826 Addressed review comments
dec29c0 [Pallavi Rao] FALCON-1826 Execution order not honoured when instances 
are KILLED


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3251e3aa
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3251e3aa
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3251e3aa

Branch: refs/heads/master
Commit: 3251e3aa17d1f6f15e49e6d2c7c4e75c7260b387
Parents: 7e554e7
Author: Pallavi Rao <[email protected]>
Authored: Fri Mar 4 12:18:26 2016 +0530
Committer: pavankumar526 <[email protected]>
Committed: Fri Mar 4 12:18:26 2016 +0530

----------------------------------------------------------------------
 .../service/impl/SchedulerService.java          | 35 ++++++++------------
 .../service/SchedulerServiceTest.java           | 11 +++---
 2 files changed, 20 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/3251e3aa/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
index f5a7c86..635fec4 100644
--- 
a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -17,7 +17,6 @@
  */
 package org.apache.falcon.notification.service.impl;
 
-import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -86,7 +85,6 @@ public class SchedulerService implements 
FalconNotificationService, Notification
 
     private static final StateStore STATE_STORE = AbstractStateStore.get();
 
-    private Cache<InstanceID, Object> instancesToIgnore;
     // TODO : limit the no. of awaiting instances per entity
     private LoadingCache<EntityClusterID, SortedMap<Integer, 
ExecutionInstance>> executorAwaitedInstances;
 
@@ -96,22 +94,28 @@ public class SchedulerService implements 
FalconNotificationService, Notification
         if (request.getInstance() == null) {
             throw new NotificationServiceException("Request must contain an 
instance.");
         }
-        // When the instance is getting rescheduled for run. As in the case of 
suspend and resume.
-        Object obj = 
instancesToIgnore.getIfPresent(request.getInstance().getId());
-        if (obj != null) {
-            instancesToIgnore.invalidate(request.getInstance().getId());
-        }
         LOG.debug("Received request to schedule instance {} with sequence 
{}.", request.getInstance().getId(),
                 request.getInstance().getInstanceSequence());
         runQueue.execute(new InstanceRunner(request));
     }
 
     @Override
-    public void unregister(NotificationHandler handler, ID listenerID) {
+    public void unregister(NotificationHandler handler, ID listenerID) throws 
NotificationServiceException {
         // If ID is that of an entity, do nothing
         if (listenerID instanceof InstanceID) {
-            // Not efficient to iterate over elements to remove this. Add to 
ignore list.
-            instancesToIgnore.put((InstanceID) listenerID, new Object());
+            try {
+                InstanceID instanceID = (InstanceID) listenerID;
+                SortedMap<Integer, ExecutionInstance> instances = 
executorAwaitedInstances.get(instanceID
+                            .getEntityClusterID());
+                if (instances != null && !instances.isEmpty()) {
+                    synchronized (instances) {
+                        
instances.remove(STATE_STORE.getExecutionInstance(instanceID)
+                                .getInstance().getInstanceSequence());
+                    }
+                }
+            } catch (Exception e) {
+                throw new NotificationServiceException(e);
+            }
         }
     }
 
@@ -155,10 +159,6 @@ public class SchedulerService implements 
FalconNotificationService, Notification
                 .removalListener(this)
                 .build(instanceCacheLoader);
 
-        instancesToIgnore = CacheBuilder.newBuilder()
-                .expireAfterWrite(1, TimeUnit.HOURS)
-                .concurrencyLevel(1)
-                .build();
         // Interested in all job completion events.
         JobCompletionNotificationRequest completionRequest = 
(JobCompletionNotificationRequest)
                 
NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_COMPLETION)
@@ -243,7 +243,6 @@ public class SchedulerService implements 
FalconNotificationService, Notification
     @Override
     public void destroy() throws FalconException {
         runQueue.shutdownNow();
-        instancesToIgnore.invalidateAll();
     }
 
     private void notifyFailureEvent(JobScheduleNotificationRequest request) 
throws FalconException {
@@ -290,12 +289,6 @@ public class SchedulerService implements 
FalconNotificationService, Notification
         @Override
         public void run() {
             try {
-                // If de-registered
-                if (instancesToIgnore.getIfPresent(instance.getId()) != null) {
-                    LOG.debug("Instance {} has been deregistered. Ignoring.", 
instance.getId());
-                    instancesToIgnore.invalidate(instance.getId());
-                    return;
-                }
                 LOG.debug("Received request to run instance {}", 
instance.getId());
                 if (checkConditions()) {
                     String externalId = instance.getExternalID();

http://git-wip-us.apache.org/repos/asf/falcon/blob/3251e3aa/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
 
b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
index a7ce748..a442738 100644
--- 
a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
+++ 
b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
@@ -242,7 +242,7 @@ public class SchedulerServiceTest extends AbstractTestBase {
     public void testDeRegistration() throws Exception {
         storeEntity(EntityType.PROCESS, "summarize4");
         Process mockProcess = getStore().get(EntityType.PROCESS, "summarize4");
-        mockProcess.setParallel(3);
+        mockProcess.setParallel(2);
         Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
         ExecutionInstance instance1 = new 
ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
         // Schedule 3 instances.
@@ -263,14 +263,15 @@ public class SchedulerServiceTest extends 
AbstractTestBase {
         request3.setInstance(instance3);
         scheduler.register(request3.build());
 
-        // Abort second instance
-        scheduler.unregister(handler, instance2.getId());
+        // Abort third instance
+        stateStore.putExecutionInstance(new InstanceState(instance3));
+        scheduler.unregister(handler, instance3.getId());
 
         Thread.sleep(100);
         Assert.assertEquals(((MockDAGEngine) 
mockDagEngine).getTotalRuns(instance1), new Integer(1));
-        Assert.assertEquals(((MockDAGEngine) 
mockDagEngine).getTotalRuns(instance3), new Integer(1));
+        Assert.assertEquals(((MockDAGEngine) 
mockDagEngine).getTotalRuns(instance2), new Integer(1));
         // Second instance should not run.
-        Assert.assertEquals(((MockDAGEngine) 
mockDagEngine).getTotalRuns(instance2), null);
+        Assert.assertEquals(((MockDAGEngine) 
mockDagEngine).getTotalRuns(instance3), null);
     }
 
     @Test

Reply via email to