FALCON-1681 Improve logging for idempotent behaviour while scheduling entities. Contributed by Pallavi Rao.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/5e270741 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/5e270741 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/5e270741 Branch: refs/heads/master Commit: 5e270741ba7bce0c898972ad96b9bd61742af8bc Parents: e64d637 Author: Ajay Yadava <[email protected]> Authored: Wed Dec 23 09:46:20 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Wed Dec 23 09:46:20 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../falcon/execution/FalconExecutionService.java | 19 +++++++++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/5e270741/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 27e2742..aa8d48f 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -86,6 +86,8 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-1681 Improve logging for idempotent behaviour while scheduling entities(Pallavi Rao via Ajay Yadava) + FALCON-1680 Error message is not intuitive when entity schedule fails(Pallavi Rao via Ajay Yadava) FALCON-1644 Retention : Some feed instances are never deleted by retention jobs(Balu Vellanki via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/5e270741/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java index 01208d6..f45ec98 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java @@ -145,10 +145,14 @@ public final class FalconExecutionService implements FalconService, EntityStateC @Override public void onSchedule(Entity entity) throws FalconException { for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { - EntityExecutor executor = createEntityExecutor(entity, cluster); EntityClusterID id = new EntityClusterID(entity, cluster); + if (executors.containsKey(id)) { + LOG.info("Entity {} is already scheduled on cluster {}.", id, cluster); + continue; + } + EntityExecutor executor = createEntityExecutor(entity, cluster); executors.put(id, executor); - LOG.info("Scheduling entity {}.", id); + LOG.info("Scheduling entity {} on cluster {}.", id, cluster); executor.schedule(); } } @@ -156,7 +160,13 @@ public final class FalconExecutionService implements FalconService, EntityStateC @Override public void onSuspend(Entity entity) throws FalconException { for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { + EntityClusterID id = new EntityClusterID(entity, cluster); + if (!executors.containsKey(id)) { + LOG.info("Entity {} is already suspended on cluster {}.", id, cluster); + continue; + } EntityExecutor executor = getEntityExecutor(entity, cluster); + LOG.info("Suspending entity, {} on cluster {}.", id, cluster); executor.suspendAll(); } } @@ -164,10 +174,11 @@ public final class FalconExecutionService implements FalconService, EntityStateC @Override public void onResume(Entity entity) throws FalconException { for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { + EntityClusterID id = new EntityClusterID(entity, cluster); + // Create even if it exists in cache, as the instances need to be refreshed. EntityExecutor executor = createEntityExecutor(entity, cluster); executors.put(new EntityClusterID(entity, cluster), executor); - LOG.info("Resuming entity, {} of type {} on cluster {}.", entity.getName(), - entity.getEntityType(), cluster); + LOG.info("Resuming entity, {} on cluster {}.", id, cluster); executor.resumeAll(); } }
