Repository: falcon Updated Branches: refs/heads/master dfcdb31c3 -> 7ffe1a33b
FALCON-1114 Oozie findBundles lists a directory and tries to match with the bundle's appPath. Contributed by Pallavi Rao. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7ffe1a33 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7ffe1a33 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7ffe1a33 Branch: refs/heads/master Commit: 7ffe1a33b6f3e8d4005e1bee89e9f91d7db33e03 Parents: dfcdb31 Author: Ajay Yadava <[email protected]> Authored: Mon Jun 15 09:41:08 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Mon Jun 15 09:41:08 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 3 ++ .../org/apache/falcon/entity/EntityUtil.java | 30 ++++++---------- .../apache/falcon/entity/EntityUtilTest.java | 36 ++++++++++++++++++++ .../workflow/engine/OozieWorkflowEngine.java | 13 +------ 4 files changed, 50 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c0bbb78..e0c4333 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,9 @@ Trunk (Unreleased) FALCON-1039 Add instance dependency API in falcon(Ajay Yadava) IMPROVEMENTS + FALCON-1114 Oozie findBundles lists a directory and tries to match with the bundle's appPath + (Pallavi Rao via Ajay Yadava) + FALCON-1207 Falcon checkstyle allows wildcard imports(Pallavi Rao via Ajay Yadava) FALCON-1147 Allow _ in the names for name value pair(Sowmya Ramesh via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index 7ebf39e..f4f266a 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -45,14 +45,11 @@ import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.resource.EntityList; import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.util.RuntimeProperties; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Method; import java.text.DateFormat; @@ -606,27 +603,20 @@ public final class EntityUtil { md5(clusterView) + "_" + String.valueOf(System.currentTimeMillis())); } - //Returns all staging paths for the entity - public static FileStatus[] getAllStagingPaths(org.apache.falcon.entity.v0.cluster.Cluster cluster, - Entity entity) throws FalconException { - Path basePath = getBaseStagingPath(cluster, entity); - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - ClusterHelper.getConfiguration(cluster)); + // Given an entity and a cluster, determines if the supplied path is the staging path for that entity. + public static boolean isStagingPath(Cluster cluster, + Entity entity, Path path) throws FalconException { + String basePath = new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING) + .getPath()).toUri().getPath(); try { - return fs.listStatus(basePath, new PathFilter() { - @Override - public boolean accept(Path path) { - return !path.getName().equals("logs"); - } - }); - - } catch (FileNotFoundException e) { - LOG.info("Staging path " + basePath + " doesn't exist, entity is not scheduled"); - //Staging path doesn't exist if entity is not scheduled + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( + ClusterHelper.getConfiguration(cluster)); + String pathString = path.toUri().getPath(); + String entityPath = entity.getEntityType().name().toLowerCase() + "/" + entity.getName(); + return fs.exists(path) && pathString.startsWith(basePath) && pathString.contains(entityPath); } catch (IOException e) { throw new FalconException(e); } - return null; } public static LateProcess getLateProcess(Entity entity) http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java index bfdb9f8..cfdc84d 100644 --- a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java +++ b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java @@ -20,6 +20,9 @@ package org.apache.falcon.entity; import org.apache.falcon.Pair; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.parser.ClusterEntityParser; +import org.apache.falcon.entity.parser.EntityParserFactory; +import org.apache.falcon.entity.parser.ProcessEntityParser; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.SchemaHelper; @@ -27,10 +30,14 @@ import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.LateArrival; import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.InputStream; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -288,4 +295,33 @@ public class EntityUtilTest extends AbstractTestBase { }; } + @Test(dataProvider = "bundlePaths") + public void testIsStagingPath(Path path, boolean createPath, boolean expected) throws Exception { + ClusterEntityParser parser = (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER); + InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML); + org.apache.falcon.entity.v0.cluster.Cluster cluster = parser.parse(stream); + + ProcessEntityParser processParser = (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS); + stream = this.getClass().getResourceAsStream(PROCESS_XML); + Process process = processParser.parse(stream); + + FileSystem fs = HadoopClientFactory.get(). + createFalconFileSystem(ClusterHelper.getConfiguration(cluster)); + if (createPath && !fs.exists(path)) { + fs.create(path); + } + + Assert.assertEquals(EntityUtil.isStagingPath(cluster, process, path), expected); + } + + @DataProvider(name = "bundlePaths") + public Object[][] getBundlePaths() { + return new Object[][] { + {new Path("/projects/falcon/staging/ivory/workflows/process/sample/"), true, true}, + {new Path("/projects/falcon/staging/falcon/workflows/process/sample/"), true, true}, + {new Path("/projects/abc/falcon/workflows/process/sample/"), true, false}, + {new Path("/projects/falcon/staging/falcon/workflows/process/test-process/"), false, false}, + {new Path("/projects/falcon/staging/falcon/workflows/process/test-process/"), true, false}, + }; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/7ffe1a33/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 02dcb2d..4085b8f 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -48,7 +48,6 @@ import org.apache.falcon.update.UpdateHelper; import org.apache.falcon.util.OozieUtils; import org.apache.falcon.util.RuntimeProperties; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.oozie.client.BundleJob; @@ -276,23 +275,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { //Return all bundles for the entity in the requested cluster private List<BundleJob> findBundles(Entity entity, String clusterName) throws FalconException { Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName); - FileStatus[] stgPaths = EntityUtil.getAllStagingPaths(cluster, entity); List<BundleJob> filteredJobs = new ArrayList<BundleJob>(); - if (stgPaths == null) { - return filteredJobs; - } - - List<String> appPaths = new ArrayList<String>(); - for (FileStatus file : stgPaths) { - appPaths.add(file.getPath().toUri().getPath()); - } - try { List<BundleJob> jobs = OozieClientFactory.get(cluster.getName()).getBundleJobsInfo(OozieClient.FILTER_NAME + "=" + EntityUtil.getWorkflowName(entity) + ";", 0, 256); if (jobs != null) { for (BundleJob job : jobs) { - if (appPaths.contains(new Path(job.getAppPath()).toUri().getPath())) { + if (EntityUtil.isStagingPath(cluster, entity, new Path(job.getAppPath()))) { //Load bundle as coord info is not returned in getBundleJobsInfo() BundleJob bundle = getBundleInfo(clusterName, job.getId()); filteredJobs.add(bundle);
