Repository: falcon Updated Branches: refs/heads/master cf492300e -> 6c8b33fd5
FALCON-1483 Add Utils to common to support native scheduler. Contributed by Pallavi Rao. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6c8b33fd Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6c8b33fd Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6c8b33fd Branch: refs/heads/master Commit: 6c8b33fd582f05680d8ea56efa784d26fc936428 Parents: cf49230 Author: Ajay Yadava <[email protected]> Authored: Thu Sep 24 11:11:10 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Thu Sep 24 11:11:10 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/falcon/entity/EntityUtil.java | 44 ++++++++++++++++++- .../apache/falcon/entity/EntityUtilTest.java | 45 ++++++++++++++++++++ 3 files changed, 90 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/6c8b33fd/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a54fbcf..d8d5a2a 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,8 @@ Trunk (Unreleased) FALCON-1027 Falcon proxy user support(Sowmya Ramesh) IMPROVEMENTS + FALCON-1483 Add Utils to common to support native scheduler(Pallavi Rao via Ajay Yadava) + FALCON-1417 Make validity end date optional for feed / process(Pragya Mittal via Ajay Yadava) FALCON-1434 Enhance schedule API to accept key-value properties(Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/6c8b33fd/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index ad41674..3ab9339 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -47,8 +47,10 @@ import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.resource.EntityList; import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.util.RuntimeProperties; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,6 +87,7 @@ public final class EntityUtil { private static final long ONE_MS = 1; public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; + private static final String STAGING_DIR_NAME_SEPARATOR = "_"; private EntityUtil() {} @@ -357,6 +360,10 @@ public final class EntityUtil { return -1; } + if (tz == null) { + tz = TimeZone.getTimeZone("UTC"); + } + Calendar startCal = Calendar.getInstance(tz); startCal.setTime(startTime); @@ -657,13 +664,48 @@ public final class EntityUtil { "falcon/workflows/" + entity.getEntityType().name().toLowerCase() + "/" + entity.getName()); } + /** + * Gets the latest staging path for an entity on a cluster, based on the dir name(that contains timestamp). + * @param cluster + * @param entity + * @return + * @throws FalconException + */ + public static Path getLatestStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, final Entity entity) + throws FalconException { + Path basePath = getBaseStagingPath(cluster, entity); + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( + ClusterHelper.getConfiguration(cluster)); + try { + final String md5 = md5(getClusterView(entity, cluster.getName())); + FileStatus[] files = fs.listStatus(basePath, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(md5); + } + }); + if (files != null && files.length != 0) { + // Find the latest directory using the timestamp used in the dir name + // These files will vary only in ts suffix (as we have filtered out using a common md5 hash), + // hence, sorting will be on timestamp. + // FileStatus compares on Path and hence the latest will be at the end after sorting. + Arrays.sort(files); + return files[files.length - 1].getPath(); + } + throw new FalconException("No staging directories found for entity " + entity.getName() + " on cluster " + + cluster.getName()); + } catch (Exception e) { + throw new FalconException("Unable get listing for " + basePath.toString(), e); + } + } + //Creates new staging path for entity schedule/update //Staging path containd md5 of the cluster view of the entity. This is required to check if update is required public static Path getNewStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity) throws FalconException { Entity clusterView = getClusterView(entity, cluster.getName()); return new Path(getBaseStagingPath(cluster, entity), - md5(clusterView) + "_" + String.valueOf(System.currentTimeMillis())); + md5(clusterView) + STAGING_DIR_NAME_SEPARATOR + String.valueOf(System.currentTimeMillis())); } // Given an entity and a cluster, determines if the supplied path is the staging path for that entity. http://git-wip-us.apache.org/repos/asf/falcon/blob/6c8b33fd/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java index d022bae..c87449c 100644 --- a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java +++ b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java @@ -38,6 +38,7 @@ import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.IOException; import java.io.InputStream; import java.text.DateFormat; import java.text.ParseException; @@ -405,4 +406,48 @@ public class EntityUtilTest extends AbstractTestBase { {":value"}, }; } + + @Test + public void testGetLatestStagingPath() throws FalconException, IOException { + ClusterEntityParser parser = (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER); + InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML); + org.apache.falcon.entity.v0.cluster.Cluster cluster = parser.parse(stream); + + ProcessEntityParser processParser = (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS); + stream = this.getClass().getResourceAsStream(PROCESS_XML); + Process process = processParser.parse(stream); + process.setName("staging-test"); + + String md5 = EntityUtil.md5(EntityUtil.getClusterView(process, "testCluster")); + FileSystem fs = HadoopClientFactory.get(). + createFalconFileSystem(ClusterHelper.getConfiguration(cluster)); + + String basePath = "/projects/falcon/staging/falcon/workflows/process/staging-test/"; + Path[] paths = { + new Path(basePath + "5a8100dc460b44db2e7bfab84b24cb92_1436441045003"), + new Path(basePath + "6b3a1b6c7cf9de62c78b125415ffb70c_1436504488677"), + new Path(basePath + md5 + "_1436344303117"), + new Path(basePath + md5 + "_1436347924846"), + new Path(basePath + md5 + "_1436357052992"), + new Path(basePath + "logs"), + new Path(basePath + "random_dir"), + }; + + // Ensure exception is thrown when there are no staging dirs. + fs.delete(new Path(basePath), true); + try { + EntityUtil.getLatestStagingPath(cluster, process); + Assert.fail("Exception expected"); + } catch (FalconException e) { + // Do nothing + } + + // Now create paths + for (Path path : paths) { + fs.create(path); + } + + // Ensure latest is returned. + Assert.assertEquals(EntityUtil.getLatestStagingPath(cluster, process).getName(), md5 + "_1436357052992"); + } }
