Repository: falcon Updated Branches: refs/heads/master 20f2acec7 -> 7624b2c8b
FALCON-1371 Status of scheduled Process entity is shown as submitted in corner case. Contributed by Balu Vellanki. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7624b2c8 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7624b2c8 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7624b2c8 Branch: refs/heads/master Commit: 7624b2c8b55e241eef60391534c39c719e0702dd Parents: 20f2ace Author: Sowmya Ramesh <[email protected]> Authored: Wed Sep 9 15:25:27 2015 -0700 Committer: Sowmya Ramesh <[email protected]> Committed: Wed Sep 9 15:25:27 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 + common/pom.xml | 14 ++-- .../workflow/engine/AbstractWorkflowEngine.java | 16 ++++- .../workflow/engine/OozieWorkflowEngine.java | 76 ++++++++++++++++---- .../falcon/resource/AbstractEntityManager.java | 20 +++--- 5 files changed, 102 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bc1422a..45ead3f 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -20,6 +20,9 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + + FALCON-1371 Status of scheduled Process entity is shown as submitted in corner case(Balu Vellanki via Sowmya Ramesh) + FALCON-1402 Validate cmd throws NPE when source cluster and any one of target cluster doesn't have overlapping dates(Pavan Kumar Kolamuri via Ajay Yadava) FALCON-1365 HCatReplication job fails with AccessControlException(Sowmya Ramesh via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml index 3a64751..0420b4c 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -51,16 +51,20 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <classifier>tests</classifier> - </dependency> - <dependency> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <classifier>tests</classifier> - </dependency> - <dependency> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> - </dependency> + </dependency> + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-client</artifactId> + </dependency> </dependencies> </profile> </profiles> http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java index 4d45cc7..ea86c2a 100644 --- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java +++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java @@ -24,12 +24,14 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; +import org.apache.oozie.client.BundleJob; +import java.util.Date; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.Date; /** @@ -65,7 +67,17 @@ public abstract class AbstractWorkflowEngine { public abstract boolean isActive(Entity entity) throws FalconException; - public abstract boolean isSuspended(Entity entity) throws FalconException; + public abstract boolean isActive(Map<String, BundleJob> bundles) throws FalconException; + + public abstract boolean isSuspended(Map<String, BundleJob> bundles) throws FalconException; + + public abstract boolean isSucceeded(Map<String, BundleJob> bundles) throws FalconException; + + public abstract boolean isFailed(Map<String, BundleJob> bundles) throws FalconException; + + public abstract boolean isKilled(Map<String, BundleJob> bundles) throws FalconException; + + public abstract Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException; public abstract InstancesResult getRunningInstances(Entity entity, List<LifeCycle> lifeCycles) throws FalconException; http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index f8b7764..5f79ca1 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -106,6 +106,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { Arrays.asList(Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED, Status.SUSPENDEDWITHERROR); private static final List<Job.Status> BUNDLE_RUNNING_STATUS = Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.RUNNINGWITHERROR); + private static final List<Job.Status> BUNDLE_SUCCEEDED_STATUS = Arrays.asList(Job.Status.SUCCEEDED); + private static final List<Job.Status> BUNDLE_FAILED_STATUS = Arrays.asList(Job.Status.FAILED, + Job.Status.DONEWITHERROR); + private static final List<Job.Status> BUNDLE_KILLED_STATUS = Arrays.asList(Job.Status.KILLED); private static final List<Job.Status> BUNDLE_SUSPEND_PRECOND = Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.DONEWITHERROR); @@ -239,26 +243,52 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { @Override public boolean isActive(Entity entity) throws FalconException { - return isBundleInState(entity, BundleStatus.ACTIVE); + return isBundleInState(findLatestBundle(entity), BundleStatus.ACTIVE); } @Override - public boolean isSuspended(Entity entity) throws FalconException { - return isBundleInState(entity, BundleStatus.SUSPENDED); + public boolean isActive(Map<String, BundleJob> bundles) throws FalconException { + return isBundleInState(bundles, BundleStatus.ACTIVE); + } + + @Override + public boolean isSuspended(Map<String, BundleJob> bundles) throws FalconException { + return isBundleInState(bundles, BundleStatus.SUSPENDED); + } + + @Override + public boolean isFailed(Map<String, BundleJob> bundles) throws FalconException { + return isBundleInState(bundles, BundleStatus.FAILED); + } + + @Override + public boolean isKilled(Map<String, BundleJob> bundles) throws FalconException { + return isBundleInState(bundles, BundleStatus.KILLED); + } + + @Override + public boolean isSucceeded(Map<String, BundleJob> bundles) throws FalconException { + return isBundleInState(bundles, BundleStatus.SUCCEEDED); } private enum BundleStatus { - ACTIVE, RUNNING, SUSPENDED + ACTIVE, RUNNING, SUSPENDED, FAILED, KILLED, SUCCEEDED } - private boolean isBundleInState(Entity entity, BundleStatus status) throws FalconException { + private boolean isBundleInState(Map<String, BundleJob> bundles, + BundleStatus status) throws FalconException { - Map<String, BundleJob> bundles = findLatestBundle(entity); - for (BundleJob bundle : bundles.values()) { - if (bundle == MISSING) {// There is no active bundle - return false; + // After removing MISSING bundles for clusters, if bundles.size() == 0, entity is not scheduled. Return false. + for (Map.Entry<String, BundleJob> clusterBundle : bundles.entrySet()) { + if (clusterBundle.getValue() == MISSING) { // There is no active bundle for this cluster + bundles.remove(clusterBundle.getKey()); } + } + if (bundles.size() == 0) { + return false; + } + for (BundleJob bundle : bundles.values()) { switch (status) { case ACTIVE: if (!BUNDLE_ACTIVE_STATUS.contains(bundle.getStatus())) { @@ -277,8 +307,27 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { return false; } break; + + case FAILED: + if (!BUNDLE_FAILED_STATUS.contains(bundle.getStatus())) { + return false; + } + break; + + case KILLED: + if (!BUNDLE_KILLED_STATUS.contains(bundle.getStatus())) { + return false; + } + break; + + case SUCCEEDED: + if (!BUNDLE_SUCCEEDED_STATUS.contains(bundle.getStatus())) { + return false; + } + break; default: } + LOG.debug("Bundle {} is in state {}", bundle.getAppName(), status.name()); } return true; } @@ -296,7 +345,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { //Load bundle as coord info is not returned in getBundleJobsInfo() BundleJob bundle = getBundleInfo(clusterName, job.getId()); filteredJobs.add(bundle); - LOG.debug("Found bundle {} with app path {}", job.getId(), job.getAppPath()); + LOG.debug("Found bundle {} with app path {} and status {}", + job.getId(), job.getAppPath(), job.getStatus()); } } } @@ -317,11 +367,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } //Return latest bundle(last created) for the entity for each cluster - private Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException { + @Override + public Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException { Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity); Map<String, BundleJob> jobMap = new HashMap<String, BundleJob>(); for (String cluster : clusters) { - jobMap.put(cluster, findLatestBundle(entity, cluster)); + BundleJob bundleJob = findLatestBundle(entity, cluster); + jobMap.put(cluster, bundleJob); } return jobMap; } http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 06ab4d9..63c5d39 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -48,6 +48,7 @@ import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; import org.apache.hadoop.io.IOUtils; +import org.apache.oozie.client.BundleJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -512,7 +513,7 @@ public abstract class AbstractEntityManager { } private enum EntityStatus { - SUBMITTED, SUSPENDED, RUNNING + SUBMITTED, SUSPENDED, RUNNING, SUCCEEDED, KILLED, FAILED } /** @@ -541,20 +542,23 @@ public abstract class AbstractEntityManager { } protected EntityStatus getStatus(Entity entity, EntityType type) throws FalconException { - EntityStatus status; + EntityStatus status = EntityStatus.SUBMITTED; + Map<String, BundleJob> latestBundles = workflowEngine.findLatestBundle(entity); if (type.isSchedulable()) { - if (workflowEngine.isActive(entity)) { - if (workflowEngine.isSuspended(entity)) { + if (workflowEngine.isActive(latestBundles)) { + if (workflowEngine.isSuspended(latestBundles)) { status = EntityStatus.SUSPENDED; } else { status = EntityStatus.RUNNING; } - } else { - status = EntityStatus.SUBMITTED; + } else if (workflowEngine.isSucceeded(latestBundles)) { + status = EntityStatus.SUCCEEDED; + } else if (workflowEngine.isKilled(latestBundles)) { + status = EntityStatus.KILLED; + } else if (workflowEngine.isFailed(latestBundles)) { + status = EntityStatus.FAILED; } - } else { - status = EntityStatus.SUBMITTED; } return status; }
