Repository: falcon Updated Branches: refs/heads/master 28fd15c49 -> b4ade492f
FALCON-1256 ListProcessInstancesTest needs to be stabilized conributed by Raghav Kumar Gautam Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/b4ade492 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/b4ade492 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/b4ade492 Branch: refs/heads/master Commit: b4ade492f7552ccb12d4714e08c77432287e6bf5 Parents: 28fd15c Author: Raghav Kumar Gautam <[email protected]> Authored: Mon Jun 8 10:28:04 2015 -0700 Committer: Raghav Kumar Gautam <[email protected]> Committed: Mon Jun 8 10:29:00 2015 -0700 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 1 + .../falcon/regression/core/util/HadoopUtil.java | 4 +- .../falcon/regression/core/util/OozieUtil.java | 41 ++++---------------- .../lineage/ListProcessInstancesTest.java | 6 ++- 4 files changed, 17 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/b4ade492/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index 3cd811f..3b1dea6 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -92,6 +92,7 @@ Trunk (Unreleased) via Samarth Gupta) IMPROVEMENTS + FALCON-1256 ListProcessInstancesTest needs to be stabilized (Raghav Kumar Gautam) FALCON-1227 Add logMover check in FeedReplication test(Pragya M via Samarth G) http://git-wip-us.apache.org/repos/asf/falcon/blob/b4ade492/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java index bb2fdfb..2a5566a 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java @@ -405,7 +405,9 @@ public final class HadoopUtil { public static void createFolders(FileSystem fs, final String folderPrefix, List<String> folderList) throws IOException { for (final String folder : folderList) { - fs.mkdirs(new Path(cutProtocol(folderPrefix + folder))); + final String pathString = cutProtocol(folderPrefix + folder); + LOGGER.info("Creating " + fs.getUri() + "/" + pathString); + fs.mkdirs(new Path(pathString)); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/b4ade492/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java index ef7d887..f327178 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java @@ -68,11 +68,6 @@ public final class OozieUtil { return client.getBundleJobsInfo(filter, start, len); } - public static List<String> getBundleIds(OozieClient client, String filter, int start, int len) - throws OozieClientException { - return getBundleIds(getBundles(client, filter, start, len)); - } - public static List<String> getBundleIds(List<BundleJob> bundles) { List<String> ids = new ArrayList<>(); for (BundleJob bundle : bundles) { @@ -82,11 +77,6 @@ public final class OozieUtil { return ids; } - public static List<Job.Status> getBundleStatuses(OozieClient client, String filter, int start, - int len) throws OozieClientException { - return getBundleStatuses(getBundles(client, filter, start, len)); - } - public static List<Job.Status> getBundleStatuses(List<BundleJob> bundles) { List<Job.Status> statuses = new ArrayList<>(); for (BundleJob bundle : bundles) { @@ -180,7 +170,7 @@ public final class OozieUtil { EntityType entityType) throws OozieClientException { String filter = String.format("name=FALCON_%s_%s", entityType, processName); - List<Job.Status> statuses = getBundleStatuses(client, filter, 0, 10); + List<Job.Status> statuses = getBundleStatuses(getBundles(client, filter, 0, 10)); if (statuses.isEmpty()) { return null; } else { @@ -192,7 +182,7 @@ public final class OozieUtil { EntityType entityType) throws OozieClientException { String filter = "name=FALCON_" + entityType + "_" + entityName; - return getBundleIds(client, filter, 0, 10); + return getBundleIds(getBundles(client, filter, 0, 10)); } public static List<DateTime> getStartTimeForRunningCoordinators(ColoHelper prismHelper, @@ -445,8 +435,8 @@ public final class OozieUtil { final OozieClient oozieClient = helper.getClusterHelper().getOozieClient(); String bundleID = getSequenceBundleID(oozieClient, entityName, type, bundleNumber); List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleID).getCoordinators(); - HadoopUtil.createFolders(helper.getClusterHelper().getHadoopFS(), helper.getPrefix(), - getMissingDependenciesForInstance(oozieClient, coords, instanceNumber)); + final List<String> missingDependencies = getMissingDependenciesForInstance(oozieClient, coords, instanceNumber); + HadoopUtil.createFolders(helper.getClusterHelper().getHadoopFS(), helper.getPrefix(), missingDependencies); } private static List<String> getMissingDependenciesForInstance(OozieClient oozieClient, @@ -557,25 +547,10 @@ public final class OozieUtil { public static String getSequenceBundleID(OozieClient oozieClient, String entityName, EntityType entityType, int bundleNumber) throws OozieClientException { //sequence start from 0 - List<String> bundleIds = getBundles(oozieClient, - entityName, entityType); - Map<Integer, String> bundleMap = new TreeMap<>(); - String bundleID; - for (String strID : bundleIds) { - LOGGER.info("getSequenceBundleID: " + strID); - int key = Integer.parseInt(strID.substring(0, strID.indexOf('-'))); - bundleMap.put(key, strID); - } - for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) { - LOGGER.info("Key = " + entry.getKey() + ", Value = " + entry.getValue()); - } - int i = 0; - for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) { - bundleID = entry.getValue(); - if (i == bundleNumber) { - return bundleID; - } - i++; + List<String> bundleIds = getBundles(oozieClient, entityName, entityType); + Collections.sort(bundleIds); + if (bundleNumber < bundleIds.size()) { + return bundleIds.get(bundleNumber); } return null; } http://git-wip-us.apache.org/repos/asf/falcon/blob/b4ade492/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java index 93e9a3e..a28472b 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java @@ -19,6 +19,7 @@ package org.apache.falcon.regression.lineage; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.util.BundleUtil; @@ -74,12 +75,15 @@ public class ListProcessInstancesTest extends BaseTestClass { bundles[0].setInputFeedDataPath(feedDataLocation); bundles[0].setOutputFeedLocationData(baseTestHDFSDir + "/output" + MINUTE_DATE_PATTERN); bundles[0].setProcessValidity(startTime, endTime); + bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes); bundles[0].setProcessConcurrency(3); bundles[0].submitAndScheduleProcess(); processName = bundles[0].getProcessName(); InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); //create data for processes to run and wait some time for instances to make progress - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0); + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 1); + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 2); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 3); }
