Repository: incubator-falcon Updated Branches: refs/heads/master c682b34df -> c1ac6e6af
FALCON-948 Enabling late rerun tests. Contributed by Paul Isaychuk Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/c1ac6e6a Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/c1ac6e6a Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/c1ac6e6a Branch: refs/heads/master Commit: c1ac6e6afe23bfba8e3cd8b567bb83ee2abf1bc3 Parents: c682b34 Author: Ruslan Ostafiychuk <rostafiyc...@apache.org> Authored: Mon Dec 22 14:08:11 2014 +0200 Committer: Ruslan Ostafiychuk <rostafiyc...@apache.org> Committed: Mon Dec 22 14:08:11 2014 +0200 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 2 + .../falcon/regression/core/util/HadoopUtil.java | 31 +++++++- .../regression/core/util/InstanceUtil.java | 15 ---- .../falcon/regression/core/util/OozieUtil.java | 8 +- .../falcon/regression/FeedLateRerunTest.java | 77 ++++---------------- .../regression/ProcessInstanceKillsTest.java | 2 +- .../falcon/regression/ProcessLateRerunTest.java | 14 +--- 7 files changed, 49 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1ac6e6a/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index b299b40..10795e1 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -184,6 +184,8 @@ Trunk (Unreleased) FALCON-681 delete duplicate feed retention test from falcon regression (SamarthG) BUG FIXES + FALCON-948 Enabling late rerun tests (Paul Isaychuk via Ruslan Ostafiychuk) + FALCON-956 Fix testProcessInstanceStatusTimedOut (Paul Isaychuk via Raghav Kumar Gautam) FALCON-937 Fix tests that are still using hdfs root dir in feeds (Raghav Kumar Gautam http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1ac6e6a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java index 3cb4f94..64574a5 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java @@ -20,6 +20,8 @@ package org.apache.falcon.regression.core.util; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -49,7 +51,7 @@ public final class HadoopUtil { /* * Removes 'hdfs(hftp)://server:port' */ - private static String cutProtocol(String path) { + public static String cutProtocol(String path) { if (StringUtils.isNotEmpty(path)) { if (protocol.matcher(path).find()) { return '/' + protocol.split(path)[1]; @@ -146,12 +148,12 @@ public final class HadoopUtil { * @param srcFileLocation source location * @throws IOException */ - public static void copyDataToFolder(final FileSystem fs, final String dstHdfsDir, + public static void copyDataToFolder(final FileSystem fs, String dstHdfsDir, final String srcFileLocation) throws IOException { LOGGER.info(String.format("Copying local dir %s to hdfs location %s on %s", srcFileLocation, dstHdfsDir, fs.getUri())); - fs.copyFromLocalFile(new Path(srcFileLocation), new Path(dstHdfsDir)); + fs.copyFromLocalFile(new Path(srcFileLocation), new Path(cutProtocol(dstHdfsDir))); } /** @@ -474,4 +476,27 @@ public final class HadoopUtil { copyDataToFolders(fs, folderPrefix, folderPaths, OSUtil.NORMAL_INPUT + "_SUCCESS", OSUtil.NORMAL_INPUT + "log_01.txt"); } + + /** + * Creates empty folders in hdfs. + * @param helper target + * @param folderList list of folders + * @throws IOException + * @deprecated method creates filesystem object by itself. We should pass existing FileSystem + * object to such methods. + */ + @Deprecated + public static void createHDFSFolders(ColoHelper helper, List<String> folderList) + throws IOException { + LOGGER.info("creating folders....."); + Configuration conf = new Configuration(); + conf.set("fs.default.name", "hdfs://" + helper.getFeedHelper().getHadoopURL()); + final FileSystem fs = FileSystem.get(conf); + for (final String folder : folderList) { + if (StringUtils.isNotEmpty(folder)) { + fs.mkdirs(new Path(cutProtocol(folder))); + } + } + LOGGER.info("created folders....."); + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1ac6e6a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java index 4f9bd9d..ce9e15b 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java @@ -568,21 +568,6 @@ public final class InstanceUtil { return actions.get(instanceNumber).getStatus(); } - - public static void createHDFSFolders(ColoHelper helper, List<String> folderList) - throws IOException { - LOGGER.info("creating folders....."); - Configuration conf = new Configuration(); - conf.set("fs.default.name", "hdfs://" + helper.getFeedHelper().getHadoopURL()); - final FileSystem fs = FileSystem.get(conf); - for (final String folder : folderList) { - if (StringUtils.isNotEmpty(folder)) { - fs.mkdirs(new Path(folder)); - } - } - LOGGER.info("created folders....."); - } - /** * Retrieves replication coordinatorID from bundle of coordinators. */ http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1ac6e6a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java index 95919ee..3098729 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java @@ -466,8 +466,8 @@ public final class OozieUtil { String bundleID = InstanceUtil.getSequenceBundleID(helper, entityName, type, bundleNumber); OozieClient oozieClient = helper.getClusterHelper().getOozieClient(); List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleID).getCoordinators(); - InstanceUtil.createHDFSFolders(helper, getMissingDependenciesForInstance(oozieClient, coords, - instanceNumber)); + HadoopUtil.createHDFSFolders(helper, getMissingDependenciesForInstance(oozieClient, coords, + instanceNumber)); } private static List<String> getMissingDependenciesForInstance(OozieClient oozieClient, @@ -501,8 +501,8 @@ public final class OozieUtil { for (int instanceNumber = 0; instanceNumber < temp.getActions().size(); instanceNumber++) { CoordinatorAction instance = temp.getActions().get(instanceNumber); - InstanceUtil.createHDFSFolders(helper, - Arrays.asList(instance.getMissingDependencies().split("#"))); + HadoopUtil.createHDFSFolders(helper, + Arrays.asList(instance.getMissingDependencies().split("#"))); } } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1ac6e6a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java index da38085..d0a6dde 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java @@ -47,6 +47,7 @@ import java.util.List; * On adding further late data it checks whether the data has been replicated correctly in the given late cut-off time. * Assuming that late frequency set in server is 3 minutes. Although value can be changed according to requirement. */ +@Test(groups = "embedded") public class FeedLateRerunTest extends BaseTestClass { private ColoHelper cluster1 = servers.get(0); @@ -67,10 +68,8 @@ public class FeedLateRerunTest extends BaseTestClass { public void setUp(Method method) throws JAXBException, IOException { LOGGER.info("test name: " + method.getName()); Bundle bundle = BundleUtil.readFeedReplicationBundle(); - bundles[0] = new Bundle(bundle, cluster1); bundles[1] = new Bundle(bundle, cluster2); - bundles[0].generateUniqueBundle(); bundles[1].generateUniqueBundle(); } @@ -109,29 +108,22 @@ public class FeedLateRerunTest extends BaseTestClass { .withClusterType(ClusterType.TARGET) .withDataLocation(targetDataLocation) .build()).toString(); - String entityName = Util.readEntityName(feed); //submit and schedule feed AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); //check if coordinator exists - InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName, - "REPLICATION"), 1); - + .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName, "REPLICATION"), 1); //Finding bundleId of replicated instance on target String bundleId = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED); - //Finding and creating missing dependencies List<String> missingDependencies = getAndCreateDependencies( cluster1, cluster1FS, cluster2, cluster2OC, bundleId, false, entityName); - int count = 1; for (String location : missingDependencies) { if (count==1) { @@ -139,40 +131,28 @@ public class FeedLateRerunTest extends BaseTestClass { count++; } } - source=splitPathFromIp(source, "8020"); LOGGER.info("source : " + source); target = source.replace("source", "target"); LOGGER.info("target : " + target); - - - /* - Sleep for some time ( as is defined in runtime property of server ). - Let the instance rerun and then it should succeed. - */ - + /* Sleep for some time ( as is defined in runtime property of server ). + Let the instance rerun and then it should succeed.*/ int sleepMins = 8; for(int i=0; i < sleepMins; i++) { LOGGER.info("Waiting..."); TimeUtil.sleepSeconds(60); } - - String bundleID = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED); OozieUtil.validateRetryAttempts(cluster2, bundleID, EntityType.FEED, 1); //check if data has been replicated correctly List<Path> cluster1ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster1FS, new Path(source)); + .getAllFilesRecursivelyHDFS(cluster1FS, new Path(HadoopUtil.cutProtocol(source))); List<Path> cluster2ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster2FS, new Path(target)); - + .getAllFilesRecursivelyHDFS(cluster2FS, new Path(HadoopUtil.cutProtocol(target))); AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); - - } - @Test(enabled = true) public void feedLateRerunTestWithData() throws AuthenticationException, IOException, URISyntaxException, JAXBException, @@ -202,20 +182,15 @@ public class FeedLateRerunTest extends BaseTestClass { .withClusterType(ClusterType.TARGET) .withDataLocation(targetDataLocation) .build()).toString(); - String entityName = Util.readEntityName(feed); //submit and schedule feed AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); //check if coordinator exists - InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName, - "REPLICATION"), 1); - + .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName, "REPLICATION"), 1); //Finding bundleId of replicated instance on target String bundleId = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED); @@ -223,7 +198,6 @@ public class FeedLateRerunTest extends BaseTestClass { //Finding and creating missing dependencies List<String> missingDependencies = getAndCreateDependencies( cluster1, cluster1FS, cluster2, cluster2OC, bundleId, true, entityName); - int count = 1; for (String location : missingDependencies) { if (count==1) { @@ -231,37 +205,27 @@ public class FeedLateRerunTest extends BaseTestClass { count++; } } - LOGGER.info("source : " + source); source=splitPathFromIp(source, "8020"); LOGGER.info("source : " + source); target = source.replace("source", "target"); LOGGER.info("target : " + target); - - - /* - Sleep for some time ( as is defined in runtime property of server ). - Let the instance rerun and then it should succeed. - */ - + /* Sleep for some time ( as is defined in runtime property of server ). + Let the instance rerun and then it should succeed.*/ int sleepMins = 8; for(int i=0; i < sleepMins; i++) { LOGGER.info("Waiting..."); TimeUtil.sleepSeconds(60); } - - String bundleID = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED); OozieUtil.validateRetryAttempts(cluster2, bundleID, EntityType.FEED, 1); //check if data has been replicated correctly List<Path> cluster1ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster1FS, new Path(source)); + .getAllFilesRecursivelyHDFS(cluster1FS, new Path(HadoopUtil.cutProtocol(source))); List<Path> cluster2ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster2FS, new Path(target)); - + .getAllFilesRecursivelyHDFS(cluster2FS, new Path(HadoopUtil.cutProtocol(target))); AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); - } private String splitPathFromIp(String src, String port) { @@ -274,7 +238,6 @@ public class FeedLateRerunTest extends BaseTestClass { } } } - if (tempSrc.isEmpty()) { reqSrc = src; } else { @@ -283,16 +246,12 @@ public class FeedLateRerunTest extends BaseTestClass { return reqSrc; } - /* - prismHelper1 - source colo - prismHelper2 - target colo - */ + /* prismHelper1 - source colo, prismHelper2 - target colo */ private List<String> getAndCreateDependencies(ColoHelper prismHelper1, FileSystem clusterFS1, ColoHelper prismHelper2, OozieClient oozieClient2, String bundleId, boolean dataFlag, String entityName) throws OozieClientException, IOException { - List<String> missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId); for (int i = 0; i < 10 && missingDependencies == null; ++i) { TimeUtil.sleepSeconds(30); @@ -300,15 +259,12 @@ public class FeedLateRerunTest extends BaseTestClass { missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId); } Assert.assertNotNull(missingDependencies, "Missing dependencies not found."); - //print missing dependencies for (String dependency : missingDependencies) { LOGGER.info("dependency from job: " + dependency); } - // Creating missing dependencies - InstanceUtil.createHDFSFolders(prismHelper1, missingDependencies); - + HadoopUtil.createHDFSFolders(prismHelper1, missingDependencies); //Adding data to empty folders depending on dataFlag if (dataFlag) { int tempCount = 1; @@ -320,13 +276,10 @@ public class FeedLateRerunTest extends BaseTestClass { } } } - //replication should start, wait while it ends InstanceUtil.waitTillInstanceReachState(oozieClient2, entityName, 1, CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); - // Adding data for late rerun - int tempCounter = 1; for (String dependency : missingDependencies) { if (tempCounter==1) { @@ -335,10 +288,6 @@ public class FeedLateRerunTest extends BaseTestClass { } tempCounter++; } - return missingDependencies; - } - - } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1ac6e6a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java index e1a4dd4..88c52d9 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java @@ -162,7 +162,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass { if (actions.size() == 6) { for(int i = 0; i < 5; i++) { CoordinatorAction action = actions.get(i); - InstanceUtil.createHDFSFolders(cluster, Arrays + HadoopUtil.createHDFSFolders(cluster, Arrays .asList(action.getMissingDependencies().split("#"))); } break; http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1ac6e6a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java index 488cf74..6804b8d 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java @@ -42,10 +42,8 @@ import java.util.*; /** * Process late data test. */ - +@Test(groups = "embedded") public class ProcessLateRerunTest extends BaseTestClass { - - private ColoHelper cluster1 = servers.get(0); private OozieClient cluster1OC = serverOC.get(0); private FileSystem cluster1FS = serverFS.get(0); @@ -79,7 +77,6 @@ public class ProcessLateRerunTest extends BaseTestClass { * It checks the number of rerun attempts once late data has been added * ensuring that late rerun happened. */ - @Test(enabled = true) public void testProcessLateRerunOnEmptyFolder() throws Exception { String startTime = TimeUtil.getTimeWrtSystemTime(0); @@ -106,7 +103,6 @@ public class ProcessLateRerunTest extends BaseTestClass { LOGGER.info("Waiting..."); TimeUtil.sleepSeconds(60); } - InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.getProcessName(bundles[0].getProcessData()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); @@ -116,7 +112,6 @@ public class ProcessLateRerunTest extends BaseTestClass { String bundleID = bundleList.get(0); OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1); - } /** @@ -150,7 +145,6 @@ public class ProcessLateRerunTest extends BaseTestClass { LOGGER.info("Waiting..."); TimeUtil.sleepSeconds(60); } - InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.getProcessName(bundles[0].getProcessData()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); @@ -160,7 +154,6 @@ public class ProcessLateRerunTest extends BaseTestClass { String bundleID = bundleList.get(0); OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1); - } /** @@ -198,7 +191,6 @@ public class ProcessLateRerunTest extends BaseTestClass { LOGGER.info("Waiting..."); TimeUtil.sleepSeconds(60); } - InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.getProcessName(bundles[0].getProcessData()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); @@ -208,7 +200,6 @@ public class ProcessLateRerunTest extends BaseTestClass { String bundleID = bundleList.get(0); OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1); - } /** @@ -267,14 +258,12 @@ public class ProcessLateRerunTest extends BaseTestClass { String bundleID = bundleList.get(0); OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 0); - } /* dataFlag - denotes whether process should run initially on empty folders or folders containing data dataFolder - denotes the folder where you want to upload data for late rerun */ - private void getAndCreateDependencies(ColoHelper prismHelper, Bundle bundle, OozieClient oozieClient, FileSystem clusterFS, boolean dataFlag, int dataFolder) { @@ -355,5 +344,4 @@ public class ProcessLateRerunTest extends BaseTestClass { lateProcess.getLateInputs().add(lateInput); return lateProcess; } - }