FALCON-1568 Process Instances are not getting scheduled in Falcon Native 
Scheduler


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/97e35874
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/97e35874
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/97e35874

Branch: refs/heads/master
Commit: 97e35874fa62103ba40a8bbb908c79c4f3b79a2d
Parents: 3487f71
Author: Pallavi Rao <[email protected]>
Authored: Mon Dec 7 10:47:16 2015 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Mon Dec 7 10:47:16 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                        |  4 ++++
 .../falcon/execution/ProcessExecutionInstance.java |  4 +++-
 .../service/impl/JobCompletionService.java         | 17 +++++++++++------
 3 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/97e35874/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 193a26e..4690c95 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -58,6 +58,10 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1598 Flaky test : EntityManagerJerseyIT.testDuplicateDeleteCommands 
(Narayan Periwal via Pallavi Rao)
+
+    FALCON-1568 Process Instances are not getting scheduled in Falcon Native 
Scheduler (Pallavi Rao)
+
     FALCON-1595 In secure cluster, Falcon server loses ability to communicate 
with HDFS over time (Balu Vellanki)
 
     FALCON-1490 Fixing inconsistencies in filterBy behavior (Narayan Periwal 
via Balu Vellanki)

http://git-wip-us.apache.org/repos/asf/falcon/blob/97e35874/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
 
b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
index cff4a73..f3beabc 100644
--- 
a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
+++ 
b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -320,6 +320,8 @@ public class ProcessExecutionInstance extends 
ExecutionInstance {
 
     @Override
     public void destroy() throws FalconException {
-        NotificationServicesRegistry.unregister(executionService, getId());
+        // Only Registration to Data service happens via process execution 
instance. So, handle just that.
+        
NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA)
+                .unregister(executionService, getId());
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/97e35874/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
index 501c6aa..23f2b4e 100644
--- 
a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
+++ 
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
@@ -41,12 +41,13 @@ import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * This notification service notifies {@link NotificationHandler} when an 
external job
@@ -57,7 +58,7 @@ public class JobCompletionService implements 
FalconNotificationService, Workflow
     private static final Logger LOG = 
LoggerFactory.getLogger(JobCompletionService.class);
     private static final DateTimeZone UTC = DateTimeZone.UTC;
 
-    private List<NotificationHandler> listeners = 
Collections.synchronizedList(new ArrayList<NotificationHandler>());
+    private Set<NotificationHandler> listeners = 
Collections.synchronizedSet(new HashSet<NotificationHandler>());
 
     @Override
     public void register(NotificationRequest notifRequest) throws 
NotificationServiceException {
@@ -140,9 +141,13 @@ public class JobCompletionService implements 
FalconNotificationService, Workflow
 
     private void onEnd(WorkflowExecutionContext context, WorkflowJob.Status 
status) throws FalconException {
         JobCompletedEvent event = new 
JobCompletedEvent(constructCallbackID(context), status, getEndTime(context));
-        for (NotificationHandler handler : listeners) {
-            LOG.debug("Notifying {} with event {}", handler, 
event.getTarget());
-            handler.onEvent(event);
+        synchronized (listeners) {
+            Iterator<NotificationHandler> iterator = listeners.iterator();
+            while(iterator.hasNext()) {
+                NotificationHandler handler = iterator.next();
+                LOG.debug("Notifying {} with event {}", handler, 
event.getTarget());
+                handler.onEvent(event);
+            }
         }
     }
 

Reply via email to