FALCON-1151 Migrate oozie related methods from InstanceUtil.java to OozieUtil.java. Contributed by Paul Isaychuk
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d0c9850e Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d0c9850e Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d0c9850e Branch: refs/heads/master Commit: d0c9850e5abf56b3e3c500ee744e96e634ba7691 Parents: 9d5429a Author: Ruslan Ostafiychuk <[email protected]> Authored: Tue Apr 14 17:00:25 2015 +0300 Committer: Ruslan Ostafiychuk <[email protected]> Committed: Tue Apr 14 17:00:25 2015 +0300 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 3 + .../regression/core/util/InstanceUtil.java | 382 +++----------- .../falcon/regression/core/util/OozieUtil.java | 332 +++++++++--- .../falcon/regression/AuthorizationTest.java | 86 ++-- .../regression/ELExpCurrentAndLastWeekTest.java | 19 +- .../falcon/regression/ELValidationsTest.java | 18 +- .../regression/EmbeddedPigScriptTest.java | 3 +- .../falcon/regression/ExternalFSTest.java | 11 +- .../regression/FeedClusterUpdateTest.java | 510 +++++-------------- .../regression/FeedInstanceListingTest.java | 19 +- .../falcon/regression/FeedLateRerunTest.java | 32 +- .../falcon/regression/FeedReplicationTest.java | 28 +- .../regression/FeedSubmitAndScheduleTest.java | 6 +- .../falcon/regression/InstanceParamTest.java | 6 +- .../falcon/regression/InstanceSummaryTest.java | 2 +- .../apache/falcon/regression/LogMoverTest.java | 4 +- .../apache/falcon/regression/NewRetryTest.java | 19 +- .../regression/ProcessInstanceKillsTest.java | 26 +- .../regression/ProcessInstanceRerunTest.java | 36 +- .../regression/ProcessInstanceResumeTest.java | 12 +- .../regression/ProcessInstanceRunningTest.java | 10 +- .../regression/ProcessInstanceStatusTest.java | 50 +- .../regression/ProcessInstanceSuspendTest.java | 10 +- .../falcon/regression/ProcessLateRerunTest.java | 20 +- .../regression/ProcessLibPathLoadTest.java | 14 +- .../falcon/regression/ProcessLibPathTest.java | 10 +- .../regression/TouchAPIPrismAndServerTest.java | 46 +- .../regression/hcat/HCatFeedOperationsTest.java | 6 +- .../regression/hcat/HCatReplicationTest.java | 16 +- .../regression/lineage/EntitySummaryTest.java | 15 +- .../lineage/LineageApiProcessInstanceTest.java | 5 +- .../lineage/ListFeedInstancesTest.java | 2 +- .../lineage/ListProcessInstancesTest.java | 2 +- .../falcon/regression/prism/FeedDelayTest.java | 24 +- .../prism/NewPrismProcessUpdateTest.java | 328 ++++++------ .../regression/prism/OptionalInputTest.java | 2 +- .../prism/PrismFeedLateReplicationTest.java | 107 ++-- .../PrismFeedReplicationPartitionExpTest.java | 98 +--- .../prism/PrismFeedReplicationUpdateTest.java | 30 +- .../regression/prism/PrismFeedUpdateTest.java | 6 +- .../prism/PrismProcessScheduleTest.java | 10 +- .../RescheduleProcessInFinalStatesTest.java | 40 +- .../falcon/regression/prism/RetentionTest.java | 2 +- .../prism/UpdateAtSpecificTimeTest.java | 150 +++--- .../falcon/regression/ui/ProcessUITest.java | 8 +- 45 files changed, 1063 insertions(+), 1502 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index 891576c..492814b 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -63,6 +63,9 @@ Trunk (Unreleased) via Samarth Gupta) IMPROVEMENTS + FALCON-1151 Migrate oozie related methods from InstanceUtil.java to OozieUtil.java + (Paul Isaychuk via Ruslan Ostafiychuk) + FALCON-1138: JDK requirement for merlin should be 1.7 (Raghav Kumar Gautam) FALCON-1135 Migrate methods related to *Merlin.java classes from InstanceUtil.java and http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java index 6c90256..723ea89 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java @@ -28,7 +28,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors; -import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper; import org.apache.falcon.request.BaseRequest; import org.apache.falcon.resource.APIResult; @@ -41,7 +40,6 @@ import org.apache.log4j.Logger; import org.apache.oozie.client.BundleJob; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.client.Job; import org.apache.oozie.client.Job.Status; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; @@ -58,29 +56,26 @@ import java.util.Collections; import java.util.Date; import java.util.EnumSet; import java.util.List; -import java.util.Map; -import java.util.TreeMap; /** * util functions related to instanceTest. */ public final class InstanceUtil { - private InstanceUtil() { - throw new AssertionError("Instantiating utility class..."); - } - + public static final int INSTANCES_CREATED_TIMEOUT = OSUtil.IS_WINDOWS ? 20 : 10; private static final Logger LOGGER = Logger.getLogger(InstanceUtil.class); private static final EnumSet<Status> RUNNING_PREP_SUCCEEDED = EnumSet.of(Status.RUNNING, Status.PREP, Status.SUCCEEDED); + private InstanceUtil() { + throw new AssertionError("Instantiating utility class..."); + } + public static APIResult sendRequestProcessInstance(String url, String user) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { return hitUrl(url, Util.getMethodType(url), user); } - public static final int INSTANCES_CREATED_TIMEOUT = OSUtil.IS_WINDOWS ? 20 : 10; - public static APIResult hitUrl(String url, String method, String user) throws URISyntaxException, IOException, AuthenticationException, InterruptedException { @@ -169,11 +164,20 @@ public final class InstanceUtil { return Collections.frequency(statuses, workflowStatus); } + /** + * Validates that response doesn't contains instances. + * @param r response + */ public static void validateSuccessWOInstances(InstancesResult r) { AssertUtil.assertSucceeded(r); Assert.assertNull(r.getInstances(), "Unexpected :" + Arrays.toString(r.getInstances())); } + /** + * Validates that failed response contains specific error message. + * @param instancesResult response + * @param error expected error + */ public static void validateError(InstancesResult instancesResult, ResponseErrors error) { Assert.assertTrue(instancesResult.getMessage().contains(error.getError()), "Error should contains '" + error.getError() + "'"); @@ -205,7 +209,6 @@ public final class InstanceUtil { LOGGER.info("status: " + status + ", instance: " + instance.getInstance()); statuses.add(status); } - Assert.assertEquals(Collections.frequency(statuses, InstancesResult.WorkflowStatus.RUNNING), runningCount, "Running Instances"); Assert.assertEquals(Collections.frequency(statuses, InstancesResult.WorkflowStatus.SUSPENDED), @@ -216,22 +219,27 @@ public final class InstanceUtil { killedCount, "Killed Instances"); } + /** + * Retrieves workflow IDs from every instances from response. + * @param instancesResult response + * @return list of workflow IDs + */ public static List<String> getWorkflowJobIds(InstancesResult instancesResult) { InstancesResult.Instance[] instances = instancesResult.getInstances(); - LOGGER.info("instances: " + Arrays.toString(instances)); - Assert.assertNotNull(instances, "instances should be not null"); - List<String> wfids = new ArrayList<String>(); + LOGGER.info("Instances: " + Arrays.toString(instances)); + Assert.assertNotNull(instances, "Instances should be not null"); + List<String> wfIds = new ArrayList<String>(); for (InstancesResult.Instance instance : instances) { - LOGGER.warn("instance: " + instance + " , status: " - + instance.getStatus() + ", logs : " + instance.getLogFile()); + LOGGER.warn(String.format( + "instance: %s, status: %s, logs : %s", instance, instance.getStatus(), instance.getLogFile())); if (instance.getStatus().name().equals("RUNNING") || instance.getStatus().name().equals("SUCCEEDED")) { - wfids.add(instance.getLogFile()); + wfIds.add(instance.getLogFile()); } if (instance.getStatus().name().equals("KILLED") || instance.getStatus().name().equals("WAITING")) { Assert.assertNull(instance.getLogFile()); } } - return wfids; + return wfIds; } /** @@ -250,21 +258,27 @@ public final class InstanceUtil { } } Assert.assertEquals(counter, failCount, "Actual number of failed instances does not " - + "match expected number of failed instances."); + + "match to expected number of failed instances."); } - public static List<String> getWorkflows(ColoHelper prismHelper, String processName, + /** + * Gets process workflows by given statuses. + * @param oozieClient oozie client of cluster where process is running + * @param processName process name + * @param statuses statuses workflows will be selected by + * @return list of matching workflows + * @throws OozieClientException + */ + public static List<String> getWorkflows(OozieClient oozieClient, String processName, WorkflowJob.Status... statuses) throws OozieClientException { - OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient(); String bundleID = OozieUtil.getBundles(oozieClient, processName, EntityType.PROCESS).get(0); - List<String> workflowJobIds = OozieUtil.getWorkflowJobs(prismHelper, bundleID); + List<String> workflowJobIds = OozieUtil.getWorkflowJobs(oozieClient, bundleID); List<String> toBeReturned = new ArrayList<String>(); for (String jobId : workflowJobIds) { WorkflowJob wfJob = oozieClient.getJobInfo(jobId); LOGGER.info("wfJob.getId(): " + wfJob.getId() + " wfJob.getStartTime(): " - + wfJob.getStartTime() - + "jobId: " + jobId + " wfJob.getStatus(): " + wfJob.getStatus()); + + wfJob.getStartTime() + "jobId: " + jobId + " wfJob.getStatus(): " + wfJob.getStatus()); if (statuses.length == 0 || Arrays.asList(statuses).contains(wfJob.getStatus())) { toBeReturned.add(jobId); } @@ -304,44 +318,17 @@ public final class InstanceUtil { } } - public static List<CoordinatorAction> getProcessInstanceList(ColoHelper coloHelper, + public static List<CoordinatorAction> getProcessInstanceList(OozieClient oozieClient, String processName, EntityType entityType) throws OozieClientException { - OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient(); - String coordId = getLatestCoordinatorID(oozieClient, processName, entityType); + String coordId = OozieUtil.getLatestCoordinatorID(oozieClient, processName, entityType); //String coordId = getDefaultCoordinatorFromProcessName(processName); LOGGER.info("default coordID: " + coordId); return oozieClient.getCoordJobInfo(coordId).getActions(); } - public static String getLatestCoordinatorID(OozieClient oozieClient, String processName, - EntityType entityType) throws OozieClientException { - final String latestBundleID = getLatestBundleID(oozieClient, processName, entityType); - return getDefaultCoordIDFromBundle(oozieClient, latestBundleID); - } - - public static String getDefaultCoordIDFromBundle(OozieClient oozieClient, String bundleId) - throws OozieClientException { - OozieUtil.waitForCoordinatorJobCreation(oozieClient, bundleId); - BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleId); - List<CoordinatorJob> coords = bundleInfo.getCoordinators(); - int min = 100000; - String minString = ""; - for (CoordinatorJob coord : coords) { - String strID = coord.getId(); - if (min > Integer.parseInt(strID.substring(0, strID.indexOf('-')))) { - min = Integer.parseInt(strID.substring(0, strID.indexOf('-'))); - minString = coord.getId(); - } - } - LOGGER.info("function getDefaultCoordIDFromBundle: minString: " + minString); - return minString; - } - - public static int getInstanceCountWithStatus(ColoHelper coloHelper, String processName, - org.apache.oozie.client.CoordinatorAction.Status status, EntityType entityType) - throws OozieClientException { - List<CoordinatorAction> coordActions = getProcessInstanceList(coloHelper, processName, - entityType); + public static int getInstanceCountWithStatus(OozieClient oozieClient, String processName, + CoordinatorAction.Status status, EntityType entityType) throws OozieClientException { + List<CoordinatorAction> coordActions = getProcessInstanceList(oozieClient, processName, entityType); List<CoordinatorAction.Status> statuses = new ArrayList<CoordinatorAction.Status>(); for (CoordinatorAction action : coordActions) { statuses.add(action.getStatus()); @@ -349,109 +336,10 @@ public final class InstanceUtil { return Collections.frequency(statuses, status); } - public static Status getDefaultCoordinatorStatus(ColoHelper colohelper, String processName, - int bundleNumber) throws OozieClientException { - OozieClient oozieClient = colohelper.getProcessHelper().getOozieClient(); - String bundleID = - getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber); - String coordId = getDefaultCoordIDFromBundle(oozieClient, bundleID); - return oozieClient.getCoordJobInfo(coordId).getStatus(); - } - - /** - * Retrieves all coordinators of bundle. - * - * @param oozieClient Oozie client to use for fetching info. - * @param bundleID specific bundle ID - * @return list of bundle coordinators - * @throws OozieClientException - */ - public static List<CoordinatorJob> getBundleCoordinators(OozieClient oozieClient, - String bundleID) throws OozieClientException { - BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleID); - return bundleInfo.getCoordinators(); - } - - /** - * Retrieves the latest bundle ID. - * - * @param coloHelper colo helper of cluster job is running on - * @param entityName name of entity job is related to - * @param entityType type of entity - feed or process expected - * @return latest bundle ID - * @throws OozieClientException - */ - public static String getLatestBundleID(ColoHelper coloHelper, - String entityName, EntityType entityType) - throws OozieClientException { - final OozieClient oozieClient = coloHelper.getFeedHelper().getOozieClient(); - return getLatestBundleID(oozieClient, entityName, entityType); - } - - /** - * Retrieves the latest bundle ID. - * - * @param oozieClient where job is running - * @param entityName name of entity job is related to - * @param entityType type of entity - feed or process expected - * @return latest bundle ID - * @throws OozieClientException - */ - public static String getLatestBundleID(OozieClient oozieClient, - String entityName, EntityType entityType) throws OozieClientException { - List<String> bundleIds = OozieUtil.getBundles(oozieClient, entityName, entityType); - String max = "0"; - int maxID = -1; - for (String strID : bundleIds) { - if (maxID < Integer.parseInt(strID.substring(0, strID.indexOf('-')))) { - maxID = Integer.parseInt(strID.substring(0, strID.indexOf('-'))); - max = strID; - } - } - return max; - } - - /** - * Retrieves ID of bundle related to some process/feed using its ordinal number. - * - * @param entityName - name of entity bundle is related to - * @param entityType - feed or process - * @param bundleNumber - ordinal number of bundle - * @return bundle ID - * @throws OozieClientException - */ - public static String getSequenceBundleID(OozieClient oozieClient, String entityName, - EntityType entityType, int bundleNumber) - throws OozieClientException { - - //sequence start from 0 - List<String> bundleIds = OozieUtil.getBundles(oozieClient, - entityName, entityType); - Map<Integer, String> bundleMap = new TreeMap<Integer, String>(); - String bundleID; - for (String strID : bundleIds) { - LOGGER.info("getSequenceBundleID: " + strID); - int key = Integer.parseInt(strID.substring(0, strID.indexOf('-'))); - bundleMap.put(key, strID); - } - for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) { - LOGGER.info("Key = " + entry.getKey() + ", Value = " + entry.getValue()); - } - int i = 0; - for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) { - bundleID = entry.getValue(); - if (i == bundleNumber) { - return bundleID; - } - i++; - } - return null; - } - /** * Retrieves status of one instance. * - * @param coloHelper - server from which instance status will be retrieved. + * @param oozieClient - server from which instance status will be retrieved. * @param processName - name of process which mentioned instance belongs to. * @param bundleNumber - ordinal number of one of the bundle which are related to that * process. @@ -459,17 +347,13 @@ public final class InstanceUtil { * @return - state of mentioned instance. * @throws OozieClientException */ - public static CoordinatorAction.Status getInstanceStatus(ColoHelper coloHelper, - String processName, - int bundleNumber, int - instanceNumber) throws OozieClientException { - final OozieClient oozieClient = coloHelper.getClusterHelper().getOozieClient(); - String bundleID = - getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber); + public static CoordinatorAction.Status getInstanceStatus(OozieClient oozieClient, String processName, + int bundleNumber, int instanceNumber) throws OozieClientException { + String bundleID = OozieUtil.getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber); if (StringUtils.isEmpty(bundleID)) { return null; } - String coordID = InstanceUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID); + String coordID = OozieUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID); if (StringUtils.isEmpty(coordID)) { return null; } @@ -487,22 +371,6 @@ public final class InstanceUtil { } /** - * Retrieves replication coordinatorID from bundle of coordinators. - */ - public static List<String> getReplicationCoordID(String bundleId, AbstractEntityHelper helper) - throws OozieClientException { - final OozieClient oozieClient = helper.getOozieClient(); - List<CoordinatorJob> coords = InstanceUtil.getBundleCoordinators(oozieClient, bundleId); - List<String> replicationCoordID = new ArrayList<String>(); - for (CoordinatorJob coord : coords) { - if (coord.getAppName().contains("FEED_REPLICATION")) { - replicationCoordID.add(coord.getId()); - } - } - return replicationCoordID; - } - - /** * Forms and sends process instance request based on url of action to be performed and it's * parameters. * @@ -510,8 +378,7 @@ public final class InstanceUtil { * @param user - whose credentials will be used for this action * @return result from API */ - public static APIResult createAndSendRequestProcessInstance( - String url, String params, String colo, String user) + public static APIResult createAndSendRequestProcessInstance(String url, String params, String colo, String user) throws IOException, URISyntaxException, AuthenticationException, InterruptedException { if (params != null && !colo.equals("")) { url = url + params + "&" + colo.substring(1); @@ -520,12 +387,11 @@ public final class InstanceUtil { } else { url = url + colo; } - return InstanceUtil.sendRequestProcessInstance(url, user); + return sendRequestProcessInstance(url, user); } public static org.apache.oozie.client.WorkflowJob.Status getInstanceStatusFromCoord( - ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException { - OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient(); + OozieClient oozieClient, String coordID, int instanceNumber) throws OozieClientException { CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); String jobId = coordInfo.getActions().get(instanceNumber).getExternalId(); LOGGER.info("jobId = " + jobId); @@ -537,15 +403,14 @@ public final class InstanceUtil { } public static List<String> getInputFoldersForInstanceForReplication( - ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException { - OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient(); + OozieClient oozieClient, String coordID, int instanceNumber) throws OozieClientException { CoordinatorAction x = oozieClient.getCoordActionInfo(coordID + "@" + instanceNumber); String jobId = x.getExternalId(); WorkflowJob wfJob = oozieClient.getJobInfo(jobId); - return InstanceUtil.getReplicationFolderFromInstanceRunConf(wfJob.getConf()); + return getReplicationFolderFromInstanceRunConf(wfJob.getConf()); } - public static List<String> getReplicationFolderFromInstanceRunConf(String runConf) { + private static List<String> getReplicationFolderFromInstanceRunConf(String runConf) { String conf; conf = runConf.substring(runConf.indexOf("falconInPaths</name>") + 20); conf = conf.substring(conf.indexOf("<value>") + 7); @@ -553,13 +418,10 @@ public final class InstanceUtil { return new ArrayList<String>(Arrays.asList(conf.split(","))); } - public static int getInstanceRunIdFromCoord(ColoHelper colo, String coordID, int instanceNumber) + public static int getInstanceRunIdFromCoord(OozieClient oozieClient, String coordID, int instanceNumber) throws OozieClientException { - OozieClient oozieClient = colo.getProcessHelper().getOozieClient(); CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); - - WorkflowJob actionInfo = - oozieClient.getJobInfo(coordInfo.getActions().get(instanceNumber).getExternalId()); + WorkflowJob actionInfo = oozieClient.getJobInfo(coordInfo.getActions().get(instanceNumber).getExternalId()); return actionInfo.getRun(); } @@ -575,11 +437,11 @@ public final class InstanceUtil { List<String> bundleIds = OozieUtil.getBundles(oozieClient, feedName, EntityType.FEED); LOGGER.info("bundleIds: " + bundleIds); - for (String aBundleId : bundleIds) { - LOGGER.info("aBundleId: " + aBundleId); - OozieUtil.waitForCoordinatorJobCreation(oozieClient, aBundleId); + for (String bundleId : bundleIds) { + LOGGER.info("bundleId: " + bundleId); + OozieUtil.waitForCoordinatorJobCreation(oozieClient, bundleId); List<CoordinatorJob> coords = - InstanceUtil.getBundleCoordinators(oozieClient, aBundleId); + OozieUtil.getBundleCoordinators(oozieClient, bundleId); LOGGER.info("coords: " + coords); for (CoordinatorJob coord : coords) { if (coord.getAppName().contains(coordType)) { @@ -591,9 +453,8 @@ public final class InstanceUtil { } public static List<CoordinatorAction> getProcessInstanceListFromAllBundles( - ColoHelper coloHelper, String processName, EntityType entityType) + OozieClient oozieClient, String processName, EntityType entityType) throws OozieClientException { - OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient(); List<CoordinatorAction> list = new ArrayList<CoordinatorAction>(); final List<String> bundleIds = OozieUtil.getBundles(oozieClient, processName, entityType); LOGGER.info("bundle size for process is " + bundleIds.size()); @@ -609,37 +470,31 @@ public final class InstanceUtil { list.addAll(actions); } } - String coordId = getLatestCoordinatorID(oozieClient, processName, entityType); + String coordId = OozieUtil.getLatestCoordinatorID(oozieClient, processName, entityType); LOGGER.info("default coordID: " + coordId); return list; } - public static String getOutputFolderForInstanceForReplication(ColoHelper coloHelper, + public static String getOutputFolderForInstanceForReplication(OozieClient oozieClient, String coordID, int instanceNumber) throws OozieClientException { - OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient(); CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); final CoordinatorAction coordAction = coordInfo.getActions().get(instanceNumber); final String actionConf = oozieClient.getJobInfo(coordAction.getExternalId()).getConf(); - return InstanceUtil.getReplicatedFolderFromInstanceRunConf(actionConf); + return getReplicatedFolderFromInstanceRunConf(actionConf); } - private static String getReplicatedFolderFromInstanceRunConf( - String runConf) { - String inputPathExample = - InstanceUtil.getReplicationFolderFromInstanceRunConf(runConf).get(0); - String postFix = inputPathExample - .substring(inputPathExample.length() - 7, inputPathExample.length()); + private static String getReplicatedFolderFromInstanceRunConf(String runConf) { + String inputPathExample = getReplicationFolderFromInstanceRunConf(runConf).get(0); + String postFix = inputPathExample.substring(inputPathExample.length() - 7, inputPathExample.length()); return getReplicatedFolderBaseFromInstanceRunConf(runConf) + postFix; } public static String getOutputFolderBaseForInstanceForReplication( - ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException { - OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient(); + OozieClient oozieClient, String coordID, int instanceNumber) throws OozieClientException { CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); - final CoordinatorAction coordAction = coordInfo.getActions().get(instanceNumber); final String actionConf = oozieClient.getJobInfo(coordAction.getExternalId()).getConf(); - return InstanceUtil.getReplicatedFolderBaseFromInstanceRunConf(actionConf); + return getReplicatedFolderBaseFromInstanceRunConf(actionConf); } private static String getReplicatedFolderBaseFromInstanceRunConf(String runConf) { @@ -661,10 +516,8 @@ public final class InstanceUtil { * @param totalMinutesToWait time in minutes for which instance state should be polled * @throws OozieClientException */ - public static void waitTillInstanceReachState(OozieClient client, String entityName, - int instancesNumber, - CoordinatorAction.Status expectedStatus, - EntityType entityType, int totalMinutesToWait) + public static void waitTillInstanceReachState(OozieClient client, String entityName, int instancesNumber, + CoordinatorAction.Status expectedStatus, EntityType entityType, int totalMinutesToWait) throws OozieClientException { String filter; // get the bundle ids @@ -682,7 +535,7 @@ public final class InstanceUtil { TimeUtil.sleepSeconds(5); } if (bundleJobs.size() == 0) { - Assert.assertTrue(false, "Could not retrieve bundles"); + Assert.fail("Could not retrieve bundles"); } List<String> bundleIds = OozieUtil.getBundleIds(bundleJobs); String bundleId = OozieUtil.getMaxId(bundleIds); @@ -735,7 +588,7 @@ public final class InstanceUtil { TimeUtil.sleepSeconds(sleepTime); } } - Assert.assertTrue(false, "expected state of instance was never reached"); + Assert.fail("expected state of instance was never reached"); } /** @@ -759,54 +612,6 @@ public final class InstanceUtil { } /** - * Waits till bundle job will reach expected status. - * Generates time according to expected status. - * - * @param coloHelper colo helper of cluster job is running on - * @param processName name of process which job is being analyzed - * @param expectedStatus job status we are waiting for - * @throws OozieClientException - */ - public static void waitForBundleToReachState(ColoHelper coloHelper, - String processName, Job.Status expectedStatus) throws - OozieClientException { - int totalMinutesToWait = getMinutesToWait(expectedStatus); - waitForBundleToReachState(coloHelper, processName, expectedStatus, totalMinutesToWait); - } - - /** - * Waits till bundle job will reach expected status during specific time. - * Use it directly in test cases when timeouts are different from trivial, in other cases use - * waitForBundleToReachState(ColoHelper, String, Status) - * - * @param coloHelper colo helper of cluster job is running on - * @param processName name of process which job is being analyzed - * @param expectedStatus job status we are waiting for - * @param totalMinutesToWait specific time to wait expected state - * @throws OozieClientException - */ - public static void waitForBundleToReachState(ColoHelper coloHelper, - String processName, Job.Status expectedStatus, int totalMinutesToWait) throws - OozieClientException { - - int sleep = totalMinutesToWait * 60 / 20; - for (int sleepCount = 0; sleepCount < sleep; sleepCount++) { - String bundleID = - InstanceUtil.getLatestBundleID(coloHelper, processName, EntityType.PROCESS); - OozieClient oozieClient = - coloHelper.getProcessHelper().getOozieClient(); - BundleJob j = oozieClient.getBundleJobInfo(bundleID); - LOGGER.info(sleepCount + ". Current status: " + j.getStatus() - + "; expected: " + expectedStatus); - if (j.getStatus() == expectedStatus) { - return; - } - TimeUtil.sleepSeconds(20); - } - Assert.fail("State " + expectedStatus + " wasn't reached in " + totalMinutesToWait + " mins"); - } - - /** * Generates time which is presumably needed for process/feed instances to reach particular * state. * Feed instances are running faster then process, so feed timeouts are less then process. @@ -815,8 +620,7 @@ public final class InstanceUtil { * @param expectedStatus expected status we are waiting for * @return minutes to wait for expected status */ - private static int getMinutesToWait(EntityType entityType, - CoordinatorAction.Status expectedStatus) { + private static int getMinutesToWait(EntityType entityType, CoordinatorAction.Status expectedStatus) { switch (expectedStatus) { case RUNNING: if (entityType == EntityType.PROCESS) { @@ -841,43 +645,22 @@ public final class InstanceUtil { } /** - * Generates time which is presumably needed for bundle job to reach particular state. - * - * @param expectedStatus status which we are expect to get from bundle job - * @return minutes to wait for expected status - */ - private static int getMinutesToWait(Job.Status expectedStatus) { - switch (expectedStatus) { - case DONEWITHERROR: - case SUCCEEDED: - return OSUtil.IS_WINDOWS ? 40 : 20; - case KILLED: - return OSUtil.IS_WINDOWS ? 30 : 15; - default: - return OSUtil.IS_WINDOWS ? 60 : 30; - } - } - - /** * Waits till instances of specific job will be created during specific time. * Use this method directly in unusual test cases where timeouts are different from trivial. - * In other cases use waitTillInstancesAreCreated(ColoHelper,String,int) + * In other cases use waitTillInstancesAreCreated(OozieClient,String,int) * * @param oozieClient oozie client of the cluster on which job is running * @param entity definition of entity which describes job * @param bundleSeqNo bundle number if update has happened. * @throws OozieClientException */ - public static void waitTillInstancesAreCreated(OozieClient oozieClient, - String entity, - int bundleSeqNo, - int totalMinutesToWait - ) throws OozieClientException { + public static void waitTillInstancesAreCreated(OozieClient oozieClient, String entity, int bundleSeqNo, + int totalMinutesToWait) throws OozieClientException { String entityName = Util.readEntityName(entity); EntityType type = Util.getEntityType(entity); - String bundleID = getSequenceBundleID(oozieClient, entityName, + String bundleID = OozieUtil.getSequenceBundleID(oozieClient, entityName, type, bundleSeqNo); - String coordID = getDefaultCoordIDFromBundle(oozieClient, bundleID); + String coordID = OozieUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID); for (int sleepCount = 0; sleepCount < totalMinutesToWait; sleepCount++) { CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); @@ -894,17 +677,14 @@ public final class InstanceUtil { * Waits till instances of specific job will be created during timeout. * Timeout is common for most of usual test cases. * - * @param coloHelper colo helper of cluster job is running on + * @param oozieClient oozieClient of cluster job is running on * @param entity definition of entity which describes job * @param bundleSeqNo bundle number if update has happened. * @throws OozieClientException */ - public static void waitTillInstancesAreCreated(ColoHelper coloHelper, - String entity, - int bundleSeqNo + public static void waitTillInstancesAreCreated(OozieClient oozieClient, String entity, int bundleSeqNo ) throws OozieClientException { int sleep = INSTANCES_CREATED_TIMEOUT * 60 / 5; - final OozieClient oozieClient = coloHelper.getClusterHelper().getOozieClient(); waitTillInstancesAreCreated(oozieClient, entity, bundleSeqNo, sleep); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java index baa69c7..138b45f 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java @@ -20,6 +20,7 @@ package org.apache.falcon.regression.core.util; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper; import org.apache.oozie.client.AuthOozieClient; import org.apache.oozie.client.BundleJob; import org.apache.oozie.client.CoordinatorAction; @@ -237,11 +238,10 @@ public final class OozieUtil { } - public static List<String> getMissingDependencies(ColoHelper helper, String bundleID) + public static List<String> getMissingDependencies(OozieClient oozieClient, String bundleID) throws OozieClientException { CoordinatorJob jobInfo; jobInfo = null; - OozieClient oozieClient = helper.getClusterHelper().getOozieClient(); BundleJob bundleJob = oozieClient.getBundleJobInfo(bundleID); List<CoordinatorJob> coordinatorJobList = bundleJob.getCoordinators(); if (coordinatorJobList.size() > 1) { @@ -285,9 +285,8 @@ public final class OozieUtil { return new ArrayList<String>(Arrays.asList(missingDependencies)); } - public static List<String> getWorkflowJobs(ColoHelper prismHelper, String bundleID) + public static List<String> getWorkflowJobs(OozieClient oozieClient, String bundleID) throws OozieClientException { - OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient(); waitForCoordinatorJobCreation(oozieClient, bundleID); List<String> workflowIds = new ArrayList<String>(); List<CoordinatorJob> coordJobs = oozieClient.getBundleJobInfo(bundleID).getCoordinators(); @@ -299,12 +298,11 @@ public final class OozieUtil { return workflowIds; } - public static List<String> getWorkflow(ColoHelper coloHelper, String bundleID) + public static List<String> getWorkflow(OozieClient oozieClient, String bundleID) throws OozieClientException { - OozieClient oozieClient = coloHelper.getClusterHelper().getOozieClient(); waitForCoordinatorJobCreation(oozieClient, bundleID); List<String> workflowIds = new ArrayList<String>(); - String coordId = InstanceUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID); + String coordId = getDefaultCoordIDFromBundle(oozieClient, bundleID); CoordinatorJob coordJobInfo = oozieClient.getCoordJobInfo(coordId); for (CoordinatorAction action : coordJobInfo.getActions()) { if (action.getStatus().name().equals("RUNNING") || action.getStatus().name().equals("SUCCEEDED")) { @@ -317,59 +315,51 @@ public final class OozieUtil { return workflowIds; } - public static Date getNominalTime(ColoHelper prismHelper, String bundleID) + public static Date getNominalTime(OozieClient oozieClient, String bundleID) throws OozieClientException { - OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient(); BundleJob bundleJob = oozieClient.getBundleJobInfo(bundleID); CoordinatorJob jobInfo = oozieClient.getCoordJobInfo(bundleJob.getCoordinators().get(0).getId()); List<CoordinatorAction> actions = jobInfo.getActions(); - return actions.get(0).getNominalTime(); - } - public static CoordinatorJob getDefaultOozieCoord(ColoHelper prismHelper, String bundleId, + public static CoordinatorJob getDefaultOozieCoord(OozieClient oozieClient, String bundleId, EntityType type) throws OozieClientException { - OozieClient client = prismHelper.getClusterHelper().getOozieClient(); - BundleJob bundlejob = client.getBundleJobInfo(bundleId); - + BundleJob bundlejob = oozieClient.getBundleJobInfo(bundleId); for (CoordinatorJob coord : bundlejob.getCoordinators()) { if ((coord.getAppName().contains("DEFAULT") && EntityType.PROCESS == type) || (coord.getAppName().contains("REPLICATION") && EntityType.FEED == type)) { - return client.getCoordJobInfo(coord.getId()); + return oozieClient.getCoordJobInfo(coord.getId()); } else { - LOGGER.info("Desired coord does not exists on " + client.getOozieUrl()); + LOGGER.info("Desired coord does not exists on " + oozieClient.getOozieUrl()); } } - return null; } - public static int getNumberOfWorkflowInstances(ColoHelper prismHelper, String bundleId) + public static int getNumberOfWorkflowInstances(OozieClient oozieClient, String bundleId) throws OozieClientException { - return getDefaultOozieCoord(prismHelper, bundleId, - EntityType.PROCESS).getActions().size(); + return getDefaultOozieCoord(oozieClient, bundleId, EntityType.PROCESS).getActions().size(); } - public static List<String> getActionsNominalTime(ColoHelper prismHelper, - String bundleId, - EntityType type) + public static List<String> getActionsNominalTime(OozieClient oozieClient, + String bundleId, EntityType type) throws OozieClientException { - Map<Date, CoordinatorAction.Status> actions = getActionsNominalTimeAndStatus(prismHelper, bundleId, type); + Map<Date, CoordinatorAction.Status> actions = getActionsNominalTimeAndStatus(oozieClient, bundleId, type); List<String> nominalTime = new ArrayList<String>(); for (Date date : actions.keySet()) { nominalTime.add(date.toString()); } return nominalTime; } - public static Map<Date, CoordinatorAction.Status> getActionsNominalTimeAndStatus(ColoHelper prismHelper, + + public static Map<Date, CoordinatorAction.Status> getActionsNominalTimeAndStatus(OozieClient oozieClient, String bundleId, EntityType type) throws OozieClientException { Map<Date, CoordinatorAction.Status> result = new TreeMap<Date, CoordinatorAction.Status>(); - List<CoordinatorAction> actions = getDefaultOozieCoord(prismHelper, - bundleId, type).getActions(); + List<CoordinatorAction> actions = getDefaultOozieCoord(oozieClient, bundleId, type).getActions(); for (CoordinatorAction action : actions) { result.put(action.getNominalTime(), action.getStatus()); } @@ -386,79 +376,54 @@ public final class OozieUtil { BundleJob.Status.SUCCEEDED, BundleJob.Status.KILLED).contains(bundleJob.getStatus())) { return true; } - TimeUtil.sleepSeconds(20); return false; } - public static void verifyNewBundleCreation(ColoHelper cluster, - String originalBundleId, - List<String> - initialNominalTimes, - String entity, - boolean shouldBeCreated, - - boolean matchInstances) throws OozieClientException { + public static void verifyNewBundleCreation(OozieClient oozieClient, String originalBundleId, + List<String> initialNominalTimes, String entity, + boolean shouldBeCreated, boolean matchInstances) + throws OozieClientException { String entityName = Util.readEntityName(entity); EntityType entityType = Util.getEntityType(entity); - String newBundleId = InstanceUtil.getLatestBundleID(cluster, entityName, - entityType); + String newBundleId = getLatestBundleID(oozieClient, entityName, entityType); if (shouldBeCreated) { Assert.assertTrue(!newBundleId.equalsIgnoreCase(originalBundleId), "eeks! new bundle is not getting created!!!!"); - LOGGER.info("old bundleId=" + originalBundleId + " on oozie: " - + - "" + cluster.getProcessHelper().getOozieClient().getOozieUrl()); - LOGGER.info("new bundleId=" + newBundleId + " on oozie: " - + - "" + cluster.getProcessHelper().getOozieClient().getOozieUrl()); + LOGGER.info("old bundleId=" + originalBundleId + " on oozie: " + oozieClient); + LOGGER.info("new bundleId=" + newBundleId + " on oozie: " + oozieClient); if (matchInstances) { - validateNumberOfWorkflowInstances(cluster, + validateNumberOfWorkflowInstances(oozieClient, initialNominalTimes, originalBundleId, newBundleId, entityType); } } else { - Assert.assertEquals(newBundleId, - originalBundleId, "eeks! new bundle is getting created!!!!"); + Assert.assertEquals(newBundleId, originalBundleId, "eeks! new bundle is getting created!!!!"); } } - private static void validateNumberOfWorkflowInstances(ColoHelper cluster, + private static void validateNumberOfWorkflowInstances(OozieClient oozieClient, List<String> initialNominalTimes, String originalBundleId, String newBundleId, EntityType type) throws OozieClientException { - - List<String> nominalTimesOriginalAndNew = getActionsNominalTime(cluster, - originalBundleId, type); - - nominalTimesOriginalAndNew.addAll(getActionsNominalTime(cluster, - newBundleId, type)); - + List<String> nominalTimesOriginalAndNew = getActionsNominalTime(oozieClient, originalBundleId, type); + nominalTimesOriginalAndNew.addAll(getActionsNominalTime(oozieClient, newBundleId, type)); initialNominalTimes.removeAll(nominalTimesOriginalAndNew); - if (initialNominalTimes.size() != 0) { LOGGER.info("Missing instance are : " + initialNominalTimes); LOGGER.debug("Original Bundle ID : " + originalBundleId); LOGGER.debug("New Bundle ID : " + newBundleId); - - Assert.assertFalse(true, "some instances have gone missing after " - + - "update"); + Assert.fail("some instances have gone missing after update"); } } - public static String getCoordStartTime(ColoHelper colo, String entity, - int bundleNo) + public static String getCoordStartTime(OozieClient oozieClient, String entity, int bundleNo) throws OozieClientException { - final OozieClient oozieClient = colo.getClusterHelper().getOozieClient(); - String bundleID = InstanceUtil.getSequenceBundleID(oozieClient, + String bundleID = getSequenceBundleID(oozieClient, Util.readEntityName(entity), Util.getEntityType(entity), bundleNo); - - CoordinatorJob coord = getDefaultOozieCoord(colo, bundleID, + CoordinatorJob coord = getDefaultOozieCoord(oozieClient, bundleID, Util.getEntityType(entity)); - - return TimeUtil.dateToOozieDate(coord.getStartTime() - ); + return TimeUtil.dateToOozieDate(coord.getStartTime()); } public static DateTimeFormatter getOozieDateTimeFormatter() { @@ -475,11 +440,10 @@ public final class OozieUtil { int instanceNumber) throws OozieClientException, IOException { final OozieClient oozieClient = helper.getClusterHelper().getOozieClient(); - String bundleID = InstanceUtil.getSequenceBundleID(oozieClient, entityName, - type, bundleNumber); + String bundleID = getSequenceBundleID(oozieClient, entityName, type, bundleNumber); List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleID).getCoordinators(); - HadoopUtil.createHDFSFolders(helper, getMissingDependenciesForInstance(oozieClient, coords, - instanceNumber)); + HadoopUtil.createFolders(helper.getClusterHelper().getHadoopFS(), helper.getPrefix(), + getMissingDependenciesForInstance(oozieClient, coords, instanceNumber)); } private static List<String> getMissingDependenciesForInstance(OozieClient oozieClient, @@ -487,7 +451,6 @@ public final class OozieUtil { throws OozieClientException { ArrayList<String> missingPaths = new ArrayList<String>(); for (CoordinatorJob coord : coords) { - CoordinatorJob temp = oozieClient.getCoordJobInfo(coord.getId()); CoordinatorAction instance = temp.getActions().get(instanceNumber); missingPaths.addAll(Arrays.asList(instance.getMissingDependencies().split("#"))); @@ -499,10 +462,8 @@ public final class OozieUtil { String entityName, int bundleNumber) throws OozieClientException, IOException { final OozieClient oozieClient = helper.getClusterHelper().getOozieClient(); - String bundleID = InstanceUtil.getSequenceBundleID(oozieClient, entityName, type, - bundleNumber); - List<List<String>> missingDependencies = createMissingDependenciesForBundle(helper, bundleID); - return missingDependencies; + String bundleID = getSequenceBundleID(oozieClient, entityName, type, bundleNumber); + return createMissingDependenciesForBundle(helper, bundleID); } public static List<List<String>> createMissingDependenciesForBundle(ColoHelper helper, String bundleId) @@ -511,7 +472,8 @@ public final class OozieUtil { List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleId).getCoordinators(); List<List<String>> missingDependencies = getMissingDependenciesForBundle(oozieClient, coords); for (List<String> missingDependencyPerInstance : missingDependencies) { - HadoopUtil.createHDFSFolders(helper, missingDependencyPerInstance); + HadoopUtil.createFolders(helper.getClusterHelper().getHadoopFS(), helper.getPrefix(), + missingDependencyPerInstance); } return missingDependencies; } @@ -531,12 +493,216 @@ public final class OozieUtil { return missingDependencies; } - public static void validateRetryAttempts(ColoHelper helper, String bundleId, EntityType type, + public static void validateRetryAttempts(OozieClient oozieClient, String bundleId, EntityType type, int attempts) throws OozieClientException { - OozieClient oozieClient = helper.getClusterHelper().getOozieClient(); - CoordinatorJob coord = getDefaultOozieCoord(helper, bundleId, type); + CoordinatorJob coord = getDefaultOozieCoord(oozieClient, bundleId, type); int actualRun = oozieClient.getJobInfo(coord.getActions().get(0).getExternalId()).getRun(); LOGGER.info("Actual run count: " + actualRun); // wrt 0 Assert.assertEquals(actualRun, attempts, "Rerun attempts did not match"); } + + public static int checkIfFeedCoordExist(OozieClient oozieClient, + String feedName, String coordType) throws OozieClientException { + LOGGER.info("feedName: " + feedName); + int numberOfCoord = 0; + if (getBundles(oozieClient, feedName, EntityType.FEED).size() == 0) { + return 0; + } + List<String> bundleIds = getBundles(oozieClient, feedName, EntityType.FEED); + LOGGER.info("bundleIds: " + bundleIds); + + for (String aBundleId : bundleIds) { + LOGGER.info("aBundleId: " + aBundleId); + waitForCoordinatorJobCreation(oozieClient, aBundleId); + List<CoordinatorJob> coords = + getBundleCoordinators(oozieClient, aBundleId); + LOGGER.info("coords: " + coords); + for (CoordinatorJob coord : coords) { + if (coord.getAppName().contains(coordType)) { + numberOfCoord++; + } + } + } + return numberOfCoord; + } + + /** + * Retrieves replication coordinatorID from bundle of coordinators. + */ + public static List<String> getReplicationCoordID(String bundleId, AbstractEntityHelper helper) + throws OozieClientException { + final OozieClient oozieClient = helper.getOozieClient(); + List<CoordinatorJob> coords = getBundleCoordinators(oozieClient, bundleId); + List<String> replicationCoordID = new ArrayList<String>(); + for (CoordinatorJob coord : coords) { + if (coord.getAppName().contains("FEED_REPLICATION")) { + replicationCoordID.add(coord.getId()); + } + } + return replicationCoordID; + } + + /** + * Retrieves ID of bundle related to some process/feed using its ordinal number. + * + * @param entityName - name of entity bundle is related to + * @param entityType - feed or process + * @param bundleNumber - ordinal number of bundle + * @return bundle ID + * @throws org.apache.oozie.client.OozieClientException + */ + public static String getSequenceBundleID(OozieClient oozieClient, String entityName, + EntityType entityType, int bundleNumber) throws OozieClientException { + //sequence start from 0 + List<String> bundleIds = getBundles(oozieClient, + entityName, entityType); + Map<Integer, String> bundleMap = new TreeMap<Integer, String>(); + String bundleID; + for (String strID : bundleIds) { + LOGGER.info("getSequenceBundleID: " + strID); + int key = Integer.parseInt(strID.substring(0, strID.indexOf('-'))); + bundleMap.put(key, strID); + } + for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) { + LOGGER.info("Key = " + entry.getKey() + ", Value = " + entry.getValue()); + } + int i = 0; + for (Map.Entry<Integer, String> entry : bundleMap.entrySet()) { + bundleID = entry.getValue(); + if (i == bundleNumber) { + return bundleID; + } + i++; + } + return null; + } + + /** + * Retrieves the latest bundle ID. + * + * @param oozieClient where job is running + * @param entityName name of entity job is related to + * @param entityType type of entity - feed or process expected + * @return latest bundle ID + * @throws org.apache.oozie.client.OozieClientException + */ + public static String getLatestBundleID(OozieClient oozieClient, + String entityName, EntityType entityType) throws OozieClientException { + List<String> bundleIds = getBundles(oozieClient, entityName, entityType); + String max = "0"; + int maxID = -1; + for (String strID : bundleIds) { + if (maxID < Integer.parseInt(strID.substring(0, strID.indexOf('-')))) { + maxID = Integer.parseInt(strID.substring(0, strID.indexOf('-'))); + max = strID; + } + } + return max; + } + + /** + * Retrieves all coordinators of bundle. + * + * @param oozieClient Oozie client to use for fetching info. + * @param bundleID specific bundle ID + * @return list of bundle coordinators + * @throws org.apache.oozie.client.OozieClientException + */ + public static List<CoordinatorJob> getBundleCoordinators(OozieClient oozieClient, String bundleID) + throws OozieClientException { + BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleID); + return bundleInfo.getCoordinators(); + } + + public static Job.Status getDefaultCoordinatorStatus(OozieClient oozieClient, String processName, + int bundleNumber) throws OozieClientException { + String bundleID = getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber); + String coordId = getDefaultCoordIDFromBundle(oozieClient, bundleID); + return oozieClient.getCoordJobInfo(coordId).getStatus(); + } + + public static String getDefaultCoordIDFromBundle(OozieClient oozieClient, String bundleId) + throws OozieClientException { + waitForCoordinatorJobCreation(oozieClient, bundleId); + BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleId); + List<CoordinatorJob> coords = bundleInfo.getCoordinators(); + int min = 100000; + String minString = ""; + for (CoordinatorJob coord : coords) { + String strID = coord.getId(); + if (min > Integer.parseInt(strID.substring(0, strID.indexOf('-')))) { + min = Integer.parseInt(strID.substring(0, strID.indexOf('-'))); + minString = coord.getId(); + } + } + LOGGER.info("function getDefaultCoordIDFromBundle: minString: " + minString); + return minString; + } + + public static String getLatestCoordinatorID(OozieClient oozieClient, String processName, + EntityType entityType) throws OozieClientException { + final String latestBundleID = getLatestBundleID(oozieClient, processName, entityType); + return getDefaultCoordIDFromBundle(oozieClient, latestBundleID); + } + + /** + * Waits till bundle job will reach expected status. + * Generates time according to expected status. + * + * @param oozieClient oozieClient of cluster job is running on + * @param processName name of process which job is being analyzed + * @param expectedStatus job status we are waiting for + * @throws org.apache.oozie.client.OozieClientException + */ + public static void waitForBundleToReachState(OozieClient oozieClient, + String processName, Job.Status expectedStatus) throws OozieClientException { + int totalMinutesToWait = getMinutesToWait(expectedStatus); + waitForBundleToReachState(oozieClient, processName, expectedStatus, totalMinutesToWait); + } + + /** + * Waits till bundle job will reach expected status during specific time. + * Use it directly in test cases when timeouts are different from trivial, in other cases use + * waitForBundleToReachState(OozieClient, String, Status) + * + * @param oozieClient oozie client of cluster job is running on + * @param processName name of process which job is being analyzed + * @param expectedStatus job status we are waiting for + * @param totalMinutesToWait specific time to wait expected state + * @throws org.apache.oozie.client.OozieClientException + */ + public static void waitForBundleToReachState(OozieClient oozieClient, String processName, + Job.Status expectedStatus, int totalMinutesToWait) throws OozieClientException { + int sleep = totalMinutesToWait * 60 / 20; + for (int sleepCount = 0; sleepCount < sleep; sleepCount++) { + String bundleID = + getLatestBundleID(oozieClient, processName, EntityType.PROCESS); + BundleJob j = oozieClient.getBundleJobInfo(bundleID); + LOGGER.info(sleepCount + ". Current status: " + j.getStatus() + + "; expected: " + expectedStatus); + if (j.getStatus() == expectedStatus) { + return; + } + TimeUtil.sleepSeconds(20); + } + Assert.fail("State " + expectedStatus + " wasn't reached in " + totalMinutesToWait + " mins"); + } + + /** + * Generates time which is presumably needed for bundle job to reach particular state. + * + * @param expectedStatus status which we are expect to get from bundle job + * @return minutes to wait for expected status + */ + private static int getMinutesToWait(Job.Status expectedStatus) { + switch (expectedStatus) { + case DONEWITHERROR: + case SUCCEEDED: + return OSUtil.IS_WINDOWS ? 40 : 20; + case KILLED: + return OSUtil.IS_WINDOWS ? 30 : 15; + default: + return OSUtil.IS_WINDOWS ? 60 : 30; + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java index 02280f3..24af21f 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java @@ -590,14 +590,13 @@ public class AuthorizationTest extends BaseTestClass { AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); //get old process details - String oldProcessBundleId = InstanceUtil - .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); - String oldProcessUser = - getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS); + String oldProcessBundleId = OozieUtil + .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); + String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); //get old feed details - String oldFeedBundleId = InstanceUtil.getLatestBundleID(cluster, feed.getName(), EntityType.FEED); - String oldFeedUser = getBundleUser(cluster, feed.getName(), EntityType.FEED); + String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, feed.getName(), EntityType.FEED); + String oldFeedUser = getBundleUser(clusterOC, feed.getName(), EntityType.FEED); //update feed definition FeedMerlin newFeed = new FeedMerlin(feed); @@ -609,15 +608,15 @@ public class AuthorizationTest extends BaseTestClass { AssertUtil.assertSucceeded(serviceResponse); //new feed bundle should be created by U1 - OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, newFeed.toString(), true, false); - String newFeedUser = - getBundleUser(cluster, newFeed.getName(), EntityType.FEED); + OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, newFeed.toString(), true, false); + String newFeedUser = getBundleUser(clusterOC, newFeed.getName(), EntityType.FEED); Assert.assertEquals(oldFeedUser, newFeedUser, "User should be the same"); //new process bundle should be created by U2 - OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); + OozieUtil.verifyNewBundleCreation( + clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); String newProcessUser = - getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS); + getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same"); } @@ -645,14 +644,13 @@ public class AuthorizationTest extends BaseTestClass { newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN); //get old process details - String oldProcessBundleId = InstanceUtil - .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); - String oldProcessUser = - getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS); + String oldProcessBundleId = OozieUtil + .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); + String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); //get old feed details - String oldFeedBundleId = InstanceUtil.getLatestBundleID(cluster, feed.getName(), EntityType.FEED); - String oldFeedUser = getBundleUser(cluster, feed.getName(), EntityType.FEED); + String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, feed.getName(), EntityType.FEED); + String oldFeedUser = getBundleUser(clusterOC, feed.getName(), EntityType.FEED); //update feed by U2 serviceResponse = prism.getFeedHelper().update(feed.toString(), newFeed.toString(), @@ -660,15 +658,15 @@ public class AuthorizationTest extends BaseTestClass { AssertUtil.assertSucceeded(serviceResponse); //new feed bundle should be created by U2 - OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, newFeed.toString(), true, false); - String newFeedUser = getBundleUser(cluster, newFeed.getName(), EntityType.FEED); + OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, newFeed.toString(), true, false); + String newFeedUser = getBundleUser(clusterOC, newFeed.getName(), EntityType.FEED); Assert.assertNotEquals(oldFeedUser, newFeedUser, "User should not be the same"); Assert.assertEquals(MerlinConstants.USER2_NAME, newFeedUser); //new process bundle should be created by U2 - OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); - String newProcessUser = - getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS); + OozieUtil.verifyNewBundleCreation( + clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); + String newProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same"); } @@ -691,14 +689,12 @@ public class AuthorizationTest extends BaseTestClass { AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); //get old process details - String oldProcessBundleId = InstanceUtil - .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); - String oldProcessUser = - getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS); + String oldProcessBundleId = OozieUtil + .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); + String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); //get old feed details - String oldFeedBundleId = InstanceUtil - .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED); + String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, Util.readEntityName(feed), EntityType.FEED); //update process by U1 ProcessMerlin processObj = bundles[0].getProcessObject(); @@ -707,12 +703,12 @@ public class AuthorizationTest extends BaseTestClass { AssertUtil.assertSucceeded(serviceResponse); //new feed bundle should not be created - OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, feed, false, false); + OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, feed, false, false); //new process bundle should be created by U1 - OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); - String newProcessUser = - getBundleUser(cluster, processObj.getName(), EntityType.PROCESS); + OozieUtil.verifyNewBundleCreation( + clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); + String newProcessUser = getBundleUser(clusterOC, processObj.getName(), EntityType.PROCESS); Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same"); } @@ -735,14 +731,12 @@ public class AuthorizationTest extends BaseTestClass { AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); //get old process details - String oldProcessBundleId = InstanceUtil - .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); - String oldProcessUser = - getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS); + String oldProcessBundleId = OozieUtil + .getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); + String oldProcessUser = getBundleUser(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); //get old feed details - String oldFeedBundleId = InstanceUtil - .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED); + String oldFeedBundleId = OozieUtil.getLatestBundleID(clusterOC, Util.readEntityName(feed), EntityType.FEED); //update process by U2 ProcessMerlin processObj = bundles[0].getProcessObject(); @@ -752,22 +746,20 @@ public class AuthorizationTest extends BaseTestClass { AssertUtil.assertSucceeded(serviceResponse); //new feed bundle should not be created - OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, feed, false, false); + OozieUtil.verifyNewBundleCreation(clusterOC, oldFeedBundleId, null, feed, false, false); //new process bundle should be created by U2 - OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); - String newProcessUser = - getBundleUser(cluster, processObj.getName(), EntityType.PROCESS); + OozieUtil.verifyNewBundleCreation( + clusterOC, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); + String newProcessUser = getBundleUser(clusterOC, processObj.getName(), EntityType.PROCESS); Assert.assertNotEquals(oldProcessUser, newProcessUser, "User should not be the same"); Assert.assertEquals(MerlinConstants.USER2_NAME, newProcessUser); } - private String getBundleUser(ColoHelper coloHelper, String entityName, EntityType entityType) + private String getBundleUser(OozieClient oozieClient, String entityName, EntityType entityType) throws OozieClientException { - String newBundleId = InstanceUtil.getLatestBundleID(coloHelper, entityName, - entityType); - BundleJob newBundleJob = - coloHelper.getClusterHelper().getOozieClient().getBundleJobInfo(newBundleId); + String newBundleId = OozieUtil.getLatestBundleID(oozieClient, entityName, entityType); + BundleJob newBundleJob = oozieClient.getBundleJobInfo(newBundleId); CoordinatorJob coordinatorJob = null; for (CoordinatorJob coord : newBundleJob.getCoordinators()) { if ((entityType == EntityType.PROCESS && coord.getAppName().contains("DEFAULT")) http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java index 33b0e77..8c5d330 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java @@ -24,7 +24,6 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit; import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.util.*; import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.Logger; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.Job; @@ -44,7 +43,6 @@ import java.util.List; public class ELExpCurrentAndLastWeekTest extends BaseTestClass { private ColoHelper cluster = servers.get(0); - private FileSystem clusterFS = serverFS.get(0); private OozieClient clusterOC = serverOC.get(0); private String baseTestDir = cleanAndGetTestDir(); private String aggregateWorkflowDir = baseTestDir + "/aggregator"; @@ -97,13 +95,11 @@ public class ELExpCurrentAndLastWeekTest extends BaseTestClass { public void currentAndLastWeekTest(String startInstance, String endInstance, String firstDep, String endDep) throws Exception { bundles[0].setDatasetInstances(startInstance, endInstance); - bundles[0].submitFeedsScheduleProcess(prism); AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); - - List<String> missingDependencies = getMissingDependencies(cluster, bundles[0]); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + List<String> missingDependencies = getMissingDependencies(clusterOC, bundles[0].getProcessName()); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, bundles[0].getProcessName(), 0); InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); @@ -146,15 +142,14 @@ public class ELExpCurrentAndLastWeekTest extends BaseTestClass { return true; } - private List<String> getMissingDependencies(ColoHelper prismHelper, Bundle bundle) throws OozieClientException { - List<String> bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(), - bundle.getProcessName(), EntityType.PROCESS); + public List<String> getMissingDependencies(OozieClient oozieClient, + String processName) throws OozieClientException { + List<String> bundles = OozieUtil.getBundles(oozieClient, processName, EntityType.PROCESS); String coordID = bundles.get(0); - List<String> missingDependencies = - OozieUtil.getMissingDependencies(prismHelper, coordID); + List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID); for (int i = 0; i < 10 && missingDependencies == null; ++i) { TimeUtil.sleepSeconds(30); - missingDependencies = OozieUtil.getMissingDependencies(prismHelper, coordID); + missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID); } Assert.assertNotNull(missingDependencies, "Missing dependencies not found."); return missingDependencies; http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java index 07292e1..37f1149 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java @@ -29,6 +29,7 @@ import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.log4j.Logger; +import org.apache.oozie.client.OozieClient; import org.testng.Assert; import org.testng.TestNGException; import org.testng.annotations.DataProvider; @@ -139,7 +140,7 @@ public class ELValidationsTest extends BaseTestClass { LOGGER.info("processData in try is: " + Util.prettyPrintXml(bundle.getProcessData())); TimeUtil.sleepSeconds(45); if (isMatch) { - getAndMatchDependencies(cluster, bundle); + getAndMatchDependencies(serverOC.get(0), bundle); } return submitResponse; } catch (Exception e) { @@ -151,12 +152,11 @@ public class ELValidationsTest extends BaseTestClass { } } - private void getAndMatchDependencies(ColoHelper prismHelper, Bundle bundle) { + private void getAndMatchDependencies(OozieClient oozieClient, Bundle bundle) { try { List<String> bundles = null; for (int i = 0; i < 10; ++i) { - bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(), - bundle.getProcessName(), EntityType.PROCESS); + bundles = OozieUtil.getBundles(oozieClient, bundle.getProcessName(), EntityType.PROCESS); if (bundles.size() > 0) { break; } @@ -165,17 +165,16 @@ public class ELValidationsTest extends BaseTestClass { Assert.assertTrue(bundles != null && bundles.size() > 0, "Bundle job not created."); String coordID = bundles.get(0); LOGGER.info("coord id: " + coordID); - List<String> missingDependencies = - OozieUtil.getMissingDependencies(prismHelper, coordID); + List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID); for (int i = 0; i < 10 && missingDependencies == null; ++i) { TimeUtil.sleepSeconds(30); - missingDependencies = OozieUtil.getMissingDependencies(prismHelper, coordID); + missingDependencies = OozieUtil.getMissingDependencies(oozieClient, coordID); } Assert.assertNotNull(missingDependencies, "Missing dependencies not found."); for (String dependency : missingDependencies) { LOGGER.info("dependency from job: " + dependency); } - Date jobNominalTime = OozieUtil.getNominalTime(prismHelper, coordID); + Date jobNominalTime = OozieUtil.getNominalTime(oozieClient, coordID); Calendar time = Calendar.getInstance(); time.setTime(jobNominalTime); LOGGER.info("nominalTime:" + jobNominalTime); @@ -224,8 +223,7 @@ public class ELValidationsTest extends BaseTestClass { } private List<String> getQADepedencyList(Calendar nominalTime, Date startRef, - Date endRef, int frequency, - Bundle bundle) { + Date endRef, int frequency, Bundle bundle) { LOGGER.info("start ref:" + startRef); LOGGER.info("end ref:" + endRef); Calendar initialTime = Calendar.getInstance(); http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java index 69be47a..4fb3c4a 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java @@ -33,6 +33,7 @@ import org.apache.falcon.regression.core.util.BundleUtil; import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.testHelper.BaseTestClass; @@ -163,7 +164,7 @@ public class EmbeddedPigScriptTest extends BaseTestClass { InstancesResult r = prism.getProcessHelper().getRunningInstance(processName); InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); int counter = OSUtil.IS_WINDOWS ? 100 : 50; - InstanceUtil.waitForBundleToReachState(cluster, bundles[0].getProcessName(), Job.Status.SUCCEEDED, counter); + OozieUtil.waitForBundleToReachState(clusterOC, bundles[0].getProcessName(), Job.Status.SUCCEEDED, counter); r = prism.getProcessHelper().getRunningInstance(processName); InstanceUtil.validateSuccessWOInstances(r); } http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java index 8eff8e4..05a2b0b 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java @@ -33,6 +33,7 @@ import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.MatrixUtil; import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.testHelper.BaseTestClass; @@ -130,7 +131,6 @@ public class ExternalFSTest extends BaseTestClass{ } - @Test(dataProvider = "getData") public void replicateToExternalFS(final FileSystem externalFS, final String separator, final boolean withData) throws Exception { @@ -181,13 +181,10 @@ public class ExternalFSTest extends BaseTestClass{ Path dstPath = new Path(endpoint + testWasbTargetDir + '/' + timePattern); //check if coordinator exists - InstanceUtil.waitTillInstancesAreCreated(cluster, feed.toString(), 0); - - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster.getFeedHelper(), Util.readEntityName(feed.toString()), - "REPLICATION"), 1); - + InstanceUtil.waitTillInstancesAreCreated(clusterOC, feed.toString(), 0); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(clusterOC, feed.getName(), "REPLICATION"), 1); TimeUtil.sleepSeconds(10); + //replication should start, wait while it ends InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(feed.toString()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
