Repository: falcon
Updated Branches:
  refs/heads/master dfcdb31c3 -> 7ffe1a33b


FALCON-1114 Oozie findBundles lists a directory and tries to match with the 
bundle's appPath. Contributed by Pallavi Rao.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7ffe1a33
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7ffe1a33
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7ffe1a33

Branch: refs/heads/master
Commit: 7ffe1a33b6f3e8d4005e1bee89e9f91d7db33e03
Parents: dfcdb31
Author: Ajay Yadava <[email protected]>
Authored: Mon Jun 15 09:41:08 2015 +0530
Committer: Ajay Yadava <[email protected]>
Committed: Mon Jun 15 09:41:08 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../org/apache/falcon/entity/EntityUtil.java    | 30 ++++++----------
 .../apache/falcon/entity/EntityUtilTest.java    | 36 ++++++++++++++++++++
 .../workflow/engine/OozieWorkflowEngine.java    | 13 +------
 4 files changed, 50 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c0bbb78..e0c4333 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
     FALCON-1039 Add instance dependency API in falcon(Ajay Yadava)
 
   IMPROVEMENTS
+    FALCON-1114 Oozie findBundles lists a directory and tries to match with 
the bundle's appPath
+    (Pallavi Rao via Ajay Yadava)
+    
     FALCON-1207 Falcon checkstyle allows wildcard imports(Pallavi Rao via Ajay 
Yadava)
     
     FALCON-1147 Allow _ in the names for name value pair(Sowmya Ramesh via 
Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java 
b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 7ebf39e..f4f266a 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -45,14 +45,11 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.text.DateFormat;
@@ -606,27 +603,20 @@ public final class EntityUtil {
             md5(clusterView) + "_" + 
String.valueOf(System.currentTimeMillis()));
     }
 
-    //Returns all staging paths for the entity
-    public static FileStatus[] 
getAllStagingPaths(org.apache.falcon.entity.v0.cluster.Cluster cluster,
-                                                  Entity entity) throws 
FalconException {
-        Path basePath = getBaseStagingPath(cluster, entity);
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster));
+    // Given an entity and a cluster, determines if the supplied path is the 
staging path for that entity.
+    public static boolean isStagingPath(Cluster cluster,
+                                        Entity entity, Path path) throws 
FalconException {
+        String basePath = new Path(ClusterHelper.getLocation(cluster, 
ClusterLocationType.STAGING)
+                .getPath()).toUri().getPath();
         try {
-            return fs.listStatus(basePath, new PathFilter() {
-                @Override
-                public boolean accept(Path path) {
-                    return !path.getName().equals("logs");
-                }
-            });
-
-        } catch (FileNotFoundException e) {
-            LOG.info("Staging path " + basePath + " doesn't exist, entity is 
not scheduled");
-            //Staging path doesn't exist if entity is not scheduled
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
+            String pathString = path.toUri().getPath();
+            String entityPath = entity.getEntityType().name().toLowerCase() + 
"/" + entity.getName();
+            return fs.exists(path) && pathString.startsWith(basePath) && 
pathString.contains(entityPath);
         } catch (IOException e) {
             throw new FalconException(e);
         }
-        return null;
     }
 
     public static LateProcess getLateProcess(Entity entity)

http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java 
b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
index bfdb9f8..cfdc84d 100644
--- a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
@@ -20,6 +20,9 @@ package org.apache.falcon.entity;
 
 import org.apache.falcon.Pair;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.parser.ClusterEntityParser;
+import org.apache.falcon.entity.parser.EntityParserFactory;
+import org.apache.falcon.entity.parser.ProcessEntityParser;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -27,10 +30,14 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LateArrival;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.io.InputStream;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -288,4 +295,33 @@ public class EntityUtilTest extends AbstractTestBase {
         };
     }
 
+    @Test(dataProvider = "bundlePaths")
+    public void testIsStagingPath(Path path, boolean createPath, boolean 
expected) throws Exception {
+        ClusterEntityParser parser = (ClusterEntityParser) 
EntityParserFactory.getParser(EntityType.CLUSTER);
+        InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML);
+        org.apache.falcon.entity.v0.cluster.Cluster cluster = 
parser.parse(stream);
+
+        ProcessEntityParser processParser = (ProcessEntityParser) 
EntityParserFactory.getParser(EntityType.PROCESS);
+        stream = this.getClass().getResourceAsStream(PROCESS_XML);
+        Process process = processParser.parse(stream);
+
+        FileSystem fs = HadoopClientFactory.get().
+                
createFalconFileSystem(ClusterHelper.getConfiguration(cluster));
+        if (createPath && !fs.exists(path)) {
+            fs.create(path);
+        }
+
+        Assert.assertEquals(EntityUtil.isStagingPath(cluster, process, path), 
expected);
+    }
+
+    @DataProvider(name = "bundlePaths")
+    public Object[][] getBundlePaths() {
+        return new Object[][] {
+            {new 
Path("/projects/falcon/staging/ivory/workflows/process/sample/"), true, true},
+            {new 
Path("/projects/falcon/staging/falcon/workflows/process/sample/"), true, true},
+            {new Path("/projects/abc/falcon/workflows/process/sample/"), true, 
false},
+            {new 
Path("/projects/falcon/staging/falcon/workflows/process/test-process/"), false, 
false},
+            {new 
Path("/projects/falcon/staging/falcon/workflows/process/test-process/"), true, 
false},
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 02dcb2d..4085b8f 100644
--- 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -48,7 +48,6 @@ import org.apache.falcon.update.UpdateHelper;
 import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.BundleJob;
@@ -276,23 +275,13 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
     //Return all bundles for the entity in the requested cluster
     private List<BundleJob> findBundles(Entity entity, String clusterName) 
throws FalconException {
         Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName);
-        FileStatus[] stgPaths = EntityUtil.getAllStagingPaths(cluster, entity);
         List<BundleJob> filteredJobs = new ArrayList<BundleJob>();
-        if (stgPaths == null) {
-            return filteredJobs;
-        }
-
-        List<String> appPaths = new ArrayList<String>();
-        for (FileStatus file : stgPaths) {
-            appPaths.add(file.getPath().toUri().getPath());
-        }
-
         try {
             List<BundleJob> jobs = 
OozieClientFactory.get(cluster.getName()).getBundleJobsInfo(OozieClient.FILTER_NAME
                 + "=" + EntityUtil.getWorkflowName(entity) + ";", 0, 256);
             if (jobs != null) {
                 for (BundleJob job : jobs) {
-                    if (appPaths.contains(new 
Path(job.getAppPath()).toUri().getPath())) {
+                    if (EntityUtil.isStagingPath(cluster, entity, new 
Path(job.getAppPath()))) {
                         //Load bundle as coord info is not returned in 
getBundleJobsInfo()
                         BundleJob bundle = getBundleInfo(clusterName, 
job.getId());
                         filteredJobs.add(bundle);

Reply via email to