Repository: falcon Updated Branches: refs/heads/master f9c8feac1 -> 3059980c6
FALCON-1572 Only one instance is running in a process when run using Native Scheduler. Contributed by Pallavi Rao. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3059980c Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3059980c Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3059980c Branch: refs/heads/master Commit: 3059980c65400bc2dc0100cdf3b1a781927a3203 Parents: f9c8fea Author: Ajay Yadava <[email protected]> Authored: Mon Dec 14 17:50:22 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Mon Dec 14 17:50:22 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + scheduler/pom.xml | 6 ++ .../apache/falcon/execution/EntityExecutor.java | 5 ++ .../falcon/execution/ExecutionInstance.java | 5 ++ .../execution/FalconExecutionService.java | 5 ++ .../falcon/execution/NotificationHandler.java | 24 +++++++ .../service/impl/JobCompletionService.java | 11 +++- .../service/impl/SchedulerService.java | 69 ++++++++++---------- .../service/SchedulerServiceTest.java | 10 +++ 9 files changed, 100 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c1f09d3..4d42881 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -62,6 +62,8 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-1572 Only one instance is running in a process when run using Native Scheduler(Pallavi Rao via Ajay Yadava) + FALCON-1660 Examples directory missing in distributed mode(Praveen Adlakha via Ajay Yadava) FALCON-1647 Unable to create feed : FilePermission error under cluster staging directory(Balu Vellanki via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/pom.xml ---------------------------------------------------------------------- diff --git a/scheduler/pom.xml b/scheduler/pom.xml index 336997d..c305651 100644 --- a/scheduler/pom.xml +++ b/scheduler/pom.xml @@ -137,6 +137,12 @@ <artifactId>derby</artifactId> <version>10.10.1.1</version> </dependency> + + <dependency> + <groupId>commons-dbcp</groupId> + <artifactId>commons-dbcp</artifactId> + <version>${commons-dbcp.version}</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java index 88d88c1..c9c0f42 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java @@ -108,4 +108,9 @@ public abstract class EntityExecutor implements NotificationHandler, InstanceSta public EntityClusterID getId() { return id; } + + @Override + public PRIORITY getPriority() { + return PRIORITY.MEDIUM; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java index 5f96d3f..3cc8a25 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java @@ -202,4 +202,9 @@ public abstract class ExecutionInstance implements NotificationHandler { * @throws FalconException */ public abstract void destroy() throws FalconException; + + @Override + public PRIORITY getPriority() { + return PRIORITY.MEDIUM; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java index b6741a4..01208d6 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java @@ -133,6 +133,11 @@ public final class FalconExecutionService implements FalconService, EntityStateC } @Override + public PRIORITY getPriority() { + return PRIORITY.HIGH; + } + + @Override public void onSubmit(Entity entity) throws FalconException { // Do nothing } http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java index b071f5f..2a2589e 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java @@ -24,6 +24,23 @@ import org.apache.falcon.notification.service.event.Event; * An interface that every class that handles notifications from notification services must implement. */ public interface NotificationHandler { + + /** + * When there are multiple notification handlers for the same event, + * the priority determines which handler gets notified first. + */ + enum PRIORITY {HIGH(5), MEDIUM(3), LOW(0); + + private final int priority; + + PRIORITY(int i) { + this.priority = i; + } + + public int getPriority() { + return priority; + } + } /** * The method a notification service calls to onEvent an event. * @@ -31,4 +48,11 @@ public interface NotificationHandler { * @throws FalconException */ void onEvent(Event event) throws FalconException; + + /** + * When there are multiple notification handlers for the same event, + * the priority determines which handler gets notified first. + * @return + */ + PRIORITY getPriority(); } http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java index 23f2b4e..4278d3f 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java @@ -17,6 +17,8 @@ */ package org.apache.falcon.notification.service.impl; +import java.util.Comparator; +import java.util.TreeSet; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.EntityUtil; @@ -43,7 +45,6 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Properties; @@ -58,7 +59,13 @@ public class JobCompletionService implements FalconNotificationService, Workflow private static final Logger LOG = LoggerFactory.getLogger(JobCompletionService.class); private static final DateTimeZone UTC = DateTimeZone.UTC; - private Set<NotificationHandler> listeners = Collections.synchronizedSet(new HashSet<NotificationHandler>()); + private Set<NotificationHandler> listeners = Collections.synchronizedSet(new TreeSet<>( + new Comparator<NotificationHandler>() { + @Override + public int compare(NotificationHandler o1, NotificationHandler o2) { + return Integer.compare(o1.getPriority().getPriority(), o2.getPriority().getPriority()); + } + })); @Override public void register(NotificationRequest notifRequest) throws NotificationServiceException { http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java index ace8444..fb11091 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java @@ -23,6 +23,9 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; +import java.util.Collections; +import java.util.SortedMap; +import java.util.TreeMap; import org.apache.falcon.FalconException; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; @@ -55,9 +58,7 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; import java.util.Comparator; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.PriorityBlockingQueue; @@ -85,7 +86,7 @@ public class SchedulerService implements FalconNotificationService, Notification private Cache<InstanceID, Object> instancesToIgnore; // TODO : limit the no. of awaiting instances per entity - private LoadingCache<EntityClusterID, List<ExecutionInstance>> executorAwaitedInstances; + private LoadingCache<EntityClusterID, SortedMap<Integer, ExecutionInstance>> executorAwaitedInstances; @Override public void register(NotificationRequest notifRequest) throws NotificationServiceException { @@ -108,7 +109,6 @@ public class SchedulerService implements FalconNotificationService, Notification // Not efficient to iterate over elements to remove this. Add to ignore list. instancesToIgnore.put((InstanceID) listenerID, new Object()); } - } @Override @@ -130,15 +130,16 @@ public class SchedulerService implements FalconNotificationService, Notification PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new PriorityComparator()); runQueue = new ThreadPoolExecutor(1, numThreads, 0L, TimeUnit.MILLISECONDS, pq); - CacheLoader instanceCacheLoader = new CacheLoader<EntityClusterID, Collection<ExecutionInstance>>() { + CacheLoader instanceCacheLoader = new CacheLoader<EntityClusterID, SortedMap<Integer, ExecutionInstance>>() { @Override - public Collection<ExecutionInstance> load(EntityClusterID id) throws Exception { + public SortedMap<Integer, ExecutionInstance> load(EntityClusterID id) throws Exception { List<InstanceState.STATE> states = new ArrayList<InstanceState.STATE>(); states.add(InstanceState.STATE.READY); - List<ExecutionInstance> readyInstances = new ArrayList<>(); + SortedMap<Integer, ExecutionInstance> readyInstances = Collections.synchronizedSortedMap( + new TreeMap<Integer, ExecutionInstance>()); // TODO : Limit it to no. of instances that can be run in parallel. for (InstanceState state : STATE_STORE.getExecutionInstances(id, states)) { - readyInstances.add(state.getInstance()); + readyInstances.put(state.getInstance().getInstanceSequence(), state.getInstance()); } return readyInstances; } @@ -184,7 +185,7 @@ public class SchedulerService implements FalconNotificationService, Notification if (event.getType() == EventType.JOB_COMPLETED) { try { ID targetID = event.getTarget(); - List<ExecutionInstance> instances = null; + SortedMap<Integer, ExecutionInstance> instances = null; // Check if the instance is awaited. if (targetID instanceof EntityClusterID) { EntityClusterID id = (EntityClusterID) event.getTarget(); @@ -197,19 +198,22 @@ public class SchedulerService implements FalconNotificationService, Notification instances = executorAwaitedInstances.get(id.getEntityClusterID()); } if (instances != null && !instances.isEmpty()) { - ExecutionInstance instance = instances.get(0); - if (instance != null && instance.getAwaitingPredicates() != null) { - for (Predicate predicate : instance.getAwaitingPredicates()) { - if (predicate.getType() == Predicate.TYPE.JOB_COMPLETION) { - // Construct a request object - NotificationHandler handler = ReflectionUtils - .getInstanceByClassName(predicate.getClauseValue("handler").toString()); - JobScheduleRequestBuilder requestBuilder = new JobScheduleRequestBuilder( - handler, instance.getId()); - requestBuilder.setInstance(instance); - InstanceRunner runner = new InstanceRunner(requestBuilder.build()); - runQueue.execute(runner); - instances.remove(instance); + synchronized (instances) { + // Order is FIFO.. + ExecutionInstance instance = instances.get(instances.firstKey()); + if (instance != null && instance.getAwaitingPredicates() != null) { + for (Predicate predicate : instance.getAwaitingPredicates()) { + if (predicate.getType() == Predicate.TYPE.JOB_COMPLETION) { + // Construct a request object + NotificationHandler handler = ReflectionUtils + .getInstanceByClassName(predicate.getClauseValue("handler").toString()); + JobScheduleRequestBuilder requestBuilder = new JobScheduleRequestBuilder( + handler, instance.getId()); + requestBuilder.setInstance(instance); + InstanceRunner runner = new InstanceRunner(requestBuilder.build()); + runQueue.execute(runner); + instances.remove(instance.getInstanceSequence()); + } } } } @@ -221,6 +225,11 @@ public class SchedulerService implements FalconNotificationService, Notification } @Override + public PRIORITY getPriority() { + return PRIORITY.MEDIUM; + } + + @Override public void destroy() throws FalconException { runQueue.shutdownNow(); instancesToIgnore.invalidateAll(); @@ -244,10 +253,6 @@ public class SchedulerService implements FalconNotificationService, Notification allowedParallelInstances = EntityUtil.getParallel(instance.getEntity()); } - public int incrementAllowedInstances() { - return ++allowedParallelInstances; - } - private EntityUtil.JOBPRIORITY getPriority(Entity entity) { switch(entity.getEntityType()) { case PROCESS : @@ -319,15 +324,9 @@ public class SchedulerService implements FalconNotificationService, Notification } private void updateExecutorAwaitedInstances(EntityClusterID id) throws ExecutionException { - synchronized (id) { - List<ExecutionInstance> instances = executorAwaitedInstances.get(id); - if (instances == null) { - // Order is FIFO. - instances = new LinkedList<>(); - executorAwaitedInstances.put(id, instances); - } - instances.add(instance); - } + SortedMap<Integer, ExecutionInstance> instances = executorAwaitedInstances.get(id); + // instances will never be null as it is initialized in the loading cache. + instances.put(instance.getInstanceSequence(), instance); } private boolean dependencyCheck() throws FalconException, ExecutionException { http://git-wip-us.apache.org/repos/asf/falcon/blob/3059980c/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java index c43ccf0..5a66518 100644 --- a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java @@ -287,6 +287,11 @@ public class SchedulerServiceTest extends AbstractTestBase { failed = true; } } + + @Override + public PRIORITY getPriority() { + return PRIORITY.MEDIUM; + } }; SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder) scheduler.createRequestBuilder(failureHandler, instance1.getId()); @@ -317,6 +322,11 @@ public class SchedulerServiceTest extends AbstractTestBase { stateStore.updateExecutionInstance(state); } } + + @Override + public PRIORITY getPriority() { + return PRIORITY.MEDIUM; + } } }
