Repository: falcon Updated Branches: refs/heads/master b806b32fd -> 52ce7f833
FALCON-1442 Contract of WorkflowEngine API broken. Contributed by Balu Vellanki. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/52ce7f83 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/52ce7f83 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/52ce7f83 Branch: refs/heads/master Commit: 52ce7f8332e4f3dc9b0b644cb8cef3453ae7e1da Parents: b806b32 Author: Ajay Yadava <[email protected]> Authored: Fri Sep 18 13:48:02 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Fri Sep 18 13:48:02 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ common/pom.xml | 4 --- .../workflow/engine/AbstractWorkflowEngine.java | 14 ++-------- .../workflow/engine/OozieWorkflowEngine.java | 29 ++++++-------------- .../falcon/resource/AbstractEntityManager.java | 16 ++++------- 5 files changed, 17 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/52ce7f83/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 25f02f0..db11e40 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -27,6 +27,8 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-1442 Contract of WorkflowEngine API broken(Balu Vellanki via Ajay Yadava) + FALCON-1460 Move getHiveCredentials method to ClusterHelper(Ajay Yadava via Sowmya Ramesh) FALCON-1342 Do not allow duplicate properties in entities(Balu Vellanki via Sowmya Ramesh) http://git-wip-us.apache.org/repos/asf/falcon/blob/52ce7f83/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml index 0420b4c..37eb2d1 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -61,10 +61,6 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </dependency> - <dependency> - <groupId>org.apache.oozie</groupId> - <artifactId>oozie-client</artifactId> - </dependency> </dependencies> </profile> </profiles> http://git-wip-us.apache.org/repos/asf/falcon/blob/52ce7f83/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java index 265106b..78af6b2 100644 --- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java +++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java @@ -24,12 +24,10 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; -import org.apache.oozie.client.BundleJob; import java.util.Date; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.Set; @@ -68,17 +66,9 @@ public abstract class AbstractWorkflowEngine { public abstract boolean isActive(Entity entity) throws FalconException; - public abstract boolean isActive(Map<String, BundleJob> bundles) throws FalconException; + public abstract boolean isSuspended(Entity entity) throws FalconException; - public abstract boolean isSuspended(Map<String, BundleJob> bundles) throws FalconException; - - public abstract boolean isSucceeded(Map<String, BundleJob> bundles) throws FalconException; - - public abstract boolean isFailed(Map<String, BundleJob> bundles) throws FalconException; - - public abstract boolean isKilled(Map<String, BundleJob> bundles) throws FalconException; - - public abstract Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException; + public abstract boolean isCompleted(Entity entity) throws FalconException; public abstract InstancesResult getRunningInstances(Entity entity, List<LifeCycle> lifeCycles) throws FalconException; http://git-wip-us.apache.org/repos/asf/falcon/blob/52ce7f83/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 96661ad..0441f7c 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -246,28 +246,16 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } @Override - public boolean isActive(Map<String, BundleJob> bundles) throws FalconException { - return isBundleInState(bundles, BundleStatus.ACTIVE); + public boolean isSuspended(Entity entity) throws FalconException { + return isBundleInState(findLatestBundle(entity), BundleStatus.SUSPENDED); } @Override - public boolean isSuspended(Map<String, BundleJob> bundles) throws FalconException { - return isBundleInState(bundles, BundleStatus.SUSPENDED); - } - - @Override - public boolean isFailed(Map<String, BundleJob> bundles) throws FalconException { - return isBundleInState(bundles, BundleStatus.FAILED); - } - - @Override - public boolean isKilled(Map<String, BundleJob> bundles) throws FalconException { - return isBundleInState(bundles, BundleStatus.KILLED); - } - - @Override - public boolean isSucceeded(Map<String, BundleJob> bundles) throws FalconException { - return isBundleInState(bundles, BundleStatus.SUCCEEDED); + public boolean isCompleted(Entity entity) throws FalconException { + Map<String, BundleJob> bundles = findLatestBundle(entity); + return (isBundleInState(bundles, BundleStatus.SUCCEEDED) + || isBundleInState(bundles, BundleStatus.FAILED) + || isBundleInState(bundles, BundleStatus.KILLED)); } private enum BundleStatus { @@ -366,8 +354,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } //Return latest bundle(last created) for the entity for each cluster - @Override - public Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException { + private Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException { Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity); Map<String, BundleJob> jobMap = new HashMap<String, BundleJob>(); for (String cluster : clusters) { http://git-wip-us.apache.org/repos/asf/falcon/blob/52ce7f83/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index b867055..bed0b6c 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -48,7 +48,6 @@ import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; import org.apache.hadoop.io.IOUtils; -import org.apache.oozie.client.BundleJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -515,7 +514,7 @@ public abstract class AbstractEntityManager { } private enum EntityStatus { - SUBMITTED, SUSPENDED, RUNNING, SUCCEEDED, KILLED, FAILED + SUBMITTED, SUSPENDED, RUNNING, COMPLETED } /** @@ -545,21 +544,16 @@ public abstract class AbstractEntityManager { protected EntityStatus getStatus(Entity entity, EntityType type) throws FalconException { EntityStatus status = EntityStatus.SUBMITTED; - Map<String, BundleJob> latestBundles = workflowEngine.findLatestBundle(entity); if (type.isSchedulable()) { - if (workflowEngine.isActive(latestBundles)) { - if (workflowEngine.isSuspended(latestBundles)) { + if (workflowEngine.isActive(entity)) { + if (workflowEngine.isSuspended(entity)) { status = EntityStatus.SUSPENDED; } else { status = EntityStatus.RUNNING; } - } else if (workflowEngine.isSucceeded(latestBundles)) { - status = EntityStatus.SUCCEEDED; - } else if (workflowEngine.isKilled(latestBundles)) { - status = EntityStatus.KILLED; - } else if (workflowEngine.isFailed(latestBundles)) { - status = EntityStatus.FAILED; + } else if (workflowEngine.isCompleted(entity)) { + status = EntityStatus.COMPLETED; } } return status;
