Repository: falcon Updated Branches: refs/heads/master d9f6c2f18 -> 987364849
FALCON-1009 InstanceUtil cleanup. Contributed by Raghav Kumar Gautam Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/98736484 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/98736484 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/98736484 Branch: refs/heads/master Commit: 9873648498f66a2c476a52416f8b7eb3ae51dadb Parents: d9f6c2f Author: Raghav Kumar Gautam <[email protected]> Authored: Tue Feb 3 12:29:16 2015 -0800 Committer: Raghav Kumar Gautam <[email protected]> Committed: Tue Feb 3 12:29:16 2015 -0800 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 2 + .../helpers/entity/AbstractEntityHelper.java | 6 +- .../regression/core/util/InstanceUtil.java | 472 +++++++------------ .../falcon/regression/core/util/OozieUtil.java | 23 +- .../regression/ProcessInstanceKillsTest.java | 3 +- .../regression/ProcessInstanceStatusTest.java | 4 +- .../prism/NewPrismProcessUpdateTest.java | 45 +- 7 files changed, 209 insertions(+), 346 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/98736484/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index 1c16c75..b79ee14 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -40,6 +40,8 @@ Trunk (Unreleased) IMPROVEMENTS + FALCON-1009 InstanceUtil cleanup (Raghav Kumar Gautam) + FALCON-1006 Add a property for inclusion of test (Raghav Kumar Gautam) FALCON-999 Fix PrismProcessResumeTest in falcon regression (Samarth Gupta) http://git-wip-us.apache.org/repos/asf/falcon/blob/98736484/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java index b365a4c..e11b37d 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java @@ -41,7 +41,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hive.hcatalog.api.HCatClient; import org.apache.hive.hcatalog.common.HCatException; import org.apache.log4j.Logger; -import org.apache.oozie.client.AuthOozieClient; +import org.apache.oozie.client.OozieClient; import org.testng.Assert; import java.io.IOException; @@ -151,14 +151,14 @@ public abstract class AbstractEntityHelper { protected String namenodePrincipal; protected String hiveMetaStorePrincipal; - public AuthOozieClient getOozieClient() { + public OozieClient getOozieClient() { if (null == this.oozieClient) { this.oozieClient = OozieUtil.getClient(this.oozieURL); } return this.oozieClient; } - protected AuthOozieClient oozieClient; + protected OozieClient oozieClient; public FileSystem getHadoopFS() throws IOException { if (null == this.hadoopFS) { http://git-wip-us.apache.org/repos/asf/falcon/blob/98736484/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java index 89b137e..3524355 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java @@ -27,19 +27,17 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.feed.ACL; import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.regression.Entities.FeedMerlin; import org.apache.falcon.regression.Entities.ProcessMerlin; import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants; import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors; import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper; +import org.apache.falcon.request.BaseRequest; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; -import org.apache.falcon.request.BaseRequest; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.http.HttpResponse; import org.apache.log4j.Logger; @@ -59,6 +57,7 @@ import java.lang.reflect.Type; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.EnumSet; import java.util.List; @@ -104,14 +103,15 @@ public final class InstanceUtil { return result; } } - for (String error : new String[] { + final String[] errorStrings = { "(FEED) not found", "is beforePROCESS start", "is after end date", "is after PROCESS's end", "is before PROCESS's start", "is before the entity was scheduled", - }) { + }; + for (String error : errorStrings) { if (responseString.contains(error)) { return result; } @@ -136,40 +136,37 @@ public final class InstanceUtil { /** * Checks if API response reflects success and if it's instances match to expected status. * - * @param r - kind of response from API which should contain information about instances - * @param b - bundle from which process instances are being analyzed - * @param ws - - expected status of instances + * @param instancesResult - kind of response from API which should contain information about + * instances + * @param bundle - bundle from which process instances are being analyzed + * @param wfStatus - - expected status of instances */ - public static void validateSuccess(InstancesResult r, Bundle b, - InstancesResult.WorkflowStatus ws) { - Assert.assertEquals(r.getStatus(), APIResult.Status.SUCCEEDED); - Assert.assertEquals(runningInstancesInResult(r, ws), b.getProcessConcurrency()); + public static void validateSuccess(InstancesResult instancesResult, Bundle bundle, + InstancesResult.WorkflowStatus wfStatus) { + Assert.assertEquals(instancesResult.getStatus(), APIResult.Status.SUCCEEDED); + Assert.assertEquals(instancesInResultWithStatus(instancesResult, wfStatus), + bundle.getProcessConcurrency()); } /** * Check the number of instances in response which have the same status as expected. * - * @param r kind of response from API which should contain information about instances - * @param ws expected status of instances + * @param instancesResult kind of response from API which should contain information about + * instances + * @param workflowStatus expected status of instances * @return number of instances which have expected status */ - public static int runningInstancesInResult(InstancesResult r, - InstancesResult.WorkflowStatus ws) { - InstancesResult.Instance[] pArray = r.getInstances(); - int runningCount = 0; - LOGGER.info("pArray: " + Arrays.toString(pArray)); - for (int instanceIndex = 0; instanceIndex < pArray.length; instanceIndex++) { - LOGGER.info( - "pArray[" + instanceIndex + "]: " + pArray[instanceIndex].getStatus() + " , " - + - pArray[instanceIndex].getInstance() - ); - - if (pArray[instanceIndex].getStatus() == ws) { - runningCount++; - } + public static int instancesInResultWithStatus(InstancesResult instancesResult, + InstancesResult.WorkflowStatus workflowStatus) { + InstancesResult.Instance[] instances = instancesResult.getInstances(); + LOGGER.info("instances: " + Arrays.toString(instances)); + List<InstancesResult.WorkflowStatus> statuses = + new ArrayList<InstancesResult.WorkflowStatus>(); + for (InstancesResult.Instance instance : instances) { + LOGGER.info("instance: " + instance + " status = " + instance.getStatus()); + statuses.add(instance.getStatus()); } - return runningCount; + return Collections.frequency(statuses, workflowStatus); } public static void validateSuccessWOInstances(InstancesResult r) { @@ -177,126 +174,81 @@ public final class InstanceUtil { Assert.assertNull(r.getInstances(), "Unexpected :" + Arrays.toString(r.getInstances())); } - public static void validateError(InstancesResult r, ResponseErrors error) { - Assert.assertTrue(r.getMessage().contains(error.getError()), + public static void validateError(InstancesResult instancesResult, ResponseErrors error) { + Assert.assertTrue(instancesResult.getMessage().contains(error.getError()), "Error should contains '" + error.getError() + "'"); } /** - * Checks that API action succeed and the instance on which it has been performed on has - * expected status. - * - * @param r kind of response from API which should contain information about instance - * @param ws expected status of instance - */ - public static void validateSuccessOnlyStart(InstancesResult r, - InstancesResult.WorkflowStatus ws) { - Assert.assertEquals(r.getStatus(), APIResult.Status.SUCCEEDED); - Assert.assertEquals(runningInstancesInResult(r, ws), 1); - } - - /** * Checks that actual number of instances with different statuses are equal to expected number * of instances with matching statuses. * - * @param r kind of response from API which should contain information about instances - * <p/> - * All parameters below reflect number of expected instances with some kind of status. - * @param totalInstances total instance. - * @param runningInstances number of running instances. - * @param suspendedInstances number of suspended instance. - * @param waitingInstances number of waiting instance. - * @param killedInstances number of killed instance. + * @param instancesResult kind of response from API which should contain information about + * instances <p/> + * All parameters below reflect number of expected instances with some + * kind of status. + * @param totalCount total number of instances. + * @param runningCount number of running instances. + * @param suspendedCount number of suspended instance. + * @param waitingCount number of waiting instance. + * @param killedCount number of killed instance. */ - public static void validateResponse(InstancesResult r, int totalInstances, - int runningInstances, - int suspendedInstances, int waitingInstances, - int killedInstances) { - - int actualRunningInstances = 0; - int actualSuspendedInstances = 0; - int actualWaitingInstances = 0; - int actualKilledInstances = 0; - InstancesResult.Instance[] pArray = r.getInstances(); - LOGGER.info("pArray: " + Arrays.toString(pArray)); - Assert.assertNotNull(pArray, "pArray should be not null"); - Assert.assertEquals(pArray.length, totalInstances, "Total Instances"); - for (int instanceIndex = 0; instanceIndex < pArray.length; instanceIndex++) { - LOGGER.info( - "pArray[" + instanceIndex + "]: " + pArray[instanceIndex].getStatus() + " , " - + - pArray[instanceIndex].getInstance()); - - switch (pArray[instanceIndex].getStatus()) { - case RUNNING: - actualRunningInstances++; - break; - case SUSPENDED: - actualSuspendedInstances++; - break; - case WAITING: - actualWaitingInstances++; - break; - case KILLED: - actualKilledInstances++; - break; - default: - Assert.fail("Unexpected status=" + pArray[instanceIndex].getStatus()); - } - } - Assert.assertEquals(actualRunningInstances, runningInstances, "Running Instances"); - Assert.assertEquals(actualSuspendedInstances, suspendedInstances, "Suspended Instances"); - Assert.assertEquals(actualWaitingInstances, waitingInstances, "Waiting Instances"); - Assert.assertEquals(actualKilledInstances, killedInstances, "Killed Instances"); + public static void validateResponse(InstancesResult instancesResult, int totalCount, + int runningCount, int suspendedCount, int waitingCount, int killedCount) { + InstancesResult.Instance[] instances = instancesResult.getInstances(); + LOGGER.info("instances: " + Arrays.toString(instances)); + Assert.assertNotNull(instances, "instances should be not null"); + Assert.assertEquals(instances.length, totalCount, "Total Instances"); + List<InstancesResult.WorkflowStatus> statuses = new ArrayList<InstancesResult.WorkflowStatus>(); + for (InstancesResult.Instance instance : instances) { + final InstancesResult.WorkflowStatus status = instance.getStatus(); + LOGGER.info("status: "+ status + ", instance: " + instance.getInstance()); + statuses.add(status); + } + + Assert.assertEquals(Collections.frequency(statuses, InstancesResult.WorkflowStatus.RUNNING), + runningCount, "Running Instances"); + Assert.assertEquals(Collections.frequency(statuses, InstancesResult.WorkflowStatus.SUSPENDED), + suspendedCount, "Suspended Instances"); + Assert.assertEquals(Collections.frequency(statuses, InstancesResult.WorkflowStatus.WAITING), + waitingCount, "Waiting Instances"); + Assert.assertEquals(Collections.frequency(statuses, InstancesResult.WorkflowStatus.KILLED), + killedCount, "Killed Instances"); } /** * Checks that expected number of failed instances matches actual number of failed ones. * - * @param r kind of response from API which should contain information about instances. + * @param instancesResult kind of response from API which should contain information about + * instances. * @param failCount number of instances which should be failed. */ - public static void validateFailedInstances(InstancesResult r, int failCount) { - AssertUtil.assertSucceeded(r); + public static void validateFailedInstances(InstancesResult instancesResult, int failCount) { + AssertUtil.assertSucceeded(instancesResult); int counter = 0; - for (InstancesResult.Instance processInstance : r.getInstances()) { - if (processInstance.getStatus() == InstancesResult.WorkflowStatus.FAILED) { + for (InstancesResult.Instance oneInstance : instancesResult.getInstances()) { + if (oneInstance.getStatus() == InstancesResult.WorkflowStatus.FAILED) { counter++; } } Assert.assertEquals(counter, failCount, "Actual number of failed instances does not " - + - "match expected number of failed instances."); + + "match expected number of failed instances."); } public static List<String> getWorkflows(ColoHelper prismHelper, String processName, - WorkflowJob.Status... ws) throws OozieClientException { - - String bundleID = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(), - processName, EntityType.PROCESS).get(0); + WorkflowJob.Status... statuses) throws OozieClientException { OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient(); - - List<String> workflows = OozieUtil.getWorkflowJobs(prismHelper, bundleID); + String bundleID = OozieUtil.getBundles(oozieClient, processName, EntityType.PROCESS).get(0); + List<String> workflowJobIds = OozieUtil.getWorkflowJobs(prismHelper, bundleID); List<String> toBeReturned = new ArrayList<String>(); - for (String jobID : workflows) { - WorkflowJob wfJob = oozieClient.getJobInfo(jobID); - LOGGER.info("wa.getExternalId(): " + wfJob.getId() + " wa" - + - ".getExternalStatus" - + - "(): " - + - wfJob.getStartTime()); - LOGGER.info("wf id: " + jobID + " wf status: " + wfJob.getStatus()); - if (ws.length == 0) { - toBeReturned.add(jobID); - } else { - for (WorkflowJob.Status status : ws) { - if (wfJob.getStatus().name().equals(status.name())) { - toBeReturned.add(jobID); - } - } + for (String jobId : workflowJobIds) { + WorkflowJob wfJob = oozieClient.getJobInfo(jobId); + LOGGER.info("wfJob.getId(): " + wfJob.getId() + " wfJob.getStartTime(): " + + wfJob.getStartTime() + + "jobId: " + jobId + " wfJob.getStatus(): " + wfJob.getStatus()); + if (statuses.length == 0 || Arrays.asList(statuses).contains(wfJob.getStatus())) { + toBeReturned.add(jobId); } } return toBeReturned; @@ -304,55 +256,38 @@ public final class InstanceUtil { public static boolean isWorkflowRunning(OozieClient oozieClient, String workflowID) throws OozieClientException { - String status = oozieClient.getJobInfo(workflowID).getStatus().toString(); - return status.equals("RUNNING"); + WorkflowJob.Status status = oozieClient.getJobInfo(workflowID).getStatus(); + return status == WorkflowJob.Status.RUNNING; } - public static void areWorkflowsRunning(OozieClient oozieClient, List<String> wfIDs, - int totalWorkflows, - int runningWorkflows, int killedWorkflows, + public static void areWorkflowsRunning(OozieClient oozieClient, List<String> workflowIds, + int totalWorkflows, int runningWorkflows, int killedWorkflows, int succeededWorkflows) throws OozieClientException { - - List<WorkflowJob> wfJobs = new ArrayList<WorkflowJob>(); - for (String wdID : wfIDs) { - wfJobs.add(oozieClient.getJobInfo(wdID)); - } if (totalWorkflows != -1) { - Assert.assertEquals(wfJobs.size(), totalWorkflows); + Assert.assertEquals(workflowIds.size(), totalWorkflows); } - int actualRunningWorkflows = 0; - int actualKilledWorkflows = 0; - int actualSucceededWorkflows = 0; - LOGGER.info("wfJobs: " + wfJobs); - for (int instanceIndex = 0; instanceIndex < wfJobs.size(); instanceIndex++) { - LOGGER.info("was.get(" + instanceIndex + ").getStatus(): " - + - wfJobs.get(instanceIndex).getStatus()); - - if (wfJobs.get(instanceIndex).getStatus().toString().equals("RUNNING")) { - actualRunningWorkflows++; - } else if (wfJobs.get(instanceIndex).getStatus().toString().equals("KILLED")) { - actualKilledWorkflows++; - } else if (wfJobs.get(instanceIndex).getStatus().toString().equals("SUCCEEDED")) { - actualSucceededWorkflows++; - } + final List<WorkflowJob.Status> statuses = new ArrayList<WorkflowJob.Status>(); + for (String wfId : workflowIds) { + final WorkflowJob.Status status = oozieClient.getJobInfo(wfId).getStatus(); + LOGGER.info("wfId: " + wfId + " status: " + status); + statuses.add(status); } if (runningWorkflows != -1) { - Assert.assertEquals(actualRunningWorkflows, runningWorkflows); + Assert.assertEquals(Collections.frequency(statuses, WorkflowJob.Status.RUNNING), + runningWorkflows, "Number of running jobs doesn't match."); } if (killedWorkflows != -1) { - Assert.assertEquals(actualKilledWorkflows, killedWorkflows); + Assert.assertEquals(Collections.frequency(statuses, WorkflowJob.Status.KILLED), + killedWorkflows, "Number of killed jobs doesn't match."); } if (succeededWorkflows != -1) { - Assert.assertEquals(actualSucceededWorkflows, succeededWorkflows); + Assert.assertEquals(Collections.frequency(statuses, WorkflowJob.Status.SUCCEEDED), + succeededWorkflows, "Number of succeeded jobs doesn't match."); } } public static List<CoordinatorAction> getProcessInstanceList(ColoHelper coloHelper, - String processName, - EntityType entityType) - throws OozieClientException { - + String processName, EntityType entityType) throws OozieClientException { OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient(); String coordId = getLatestCoordinatorID(oozieClient, processName, entityType); //String coordId = getDefaultCoordinatorFromProcessName(processName); @@ -361,15 +296,13 @@ public final class InstanceUtil { } public static String getLatestCoordinatorID(OozieClient oozieClient, String processName, - EntityType entityType) - throws OozieClientException { - return getDefaultCoordIDFromBundle(oozieClient, - getLatestBundleID(oozieClient, processName, entityType)); + EntityType entityType) throws OozieClientException { + final String latestBundleID = getLatestBundleID(oozieClient, processName, entityType); + return getDefaultCoordIDFromBundle(oozieClient, latestBundleID); } public static String getDefaultCoordIDFromBundle(OozieClient oozieClient, String bundleId) throws OozieClientException { - OozieUtil.waitForCoordinatorJobCreation(oozieClient, bundleId); BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleId); List<CoordinatorJob> coords = bundleInfo.getCoordinators(); @@ -387,48 +320,37 @@ public final class InstanceUtil { } public static int getInstanceCountWithStatus(ColoHelper coloHelper, String processName, - org.apache.oozie.client.CoordinatorAction.Status - status, - EntityType entityType) + org.apache.oozie.client.CoordinatorAction.Status status, EntityType entityType) throws OozieClientException { - List<CoordinatorAction> list = getProcessInstanceList(coloHelper, processName, entityType); - int instanceCount = 0; - for (CoordinatorAction aList : list) { - if (aList.getStatus().equals(status)) { - instanceCount++; - } + List<CoordinatorAction> coordActions = getProcessInstanceList(coloHelper, processName, + entityType); + List<CoordinatorAction.Status> statuses = new ArrayList<CoordinatorAction.Status>(); + for (CoordinatorAction action : coordActions) { + statuses.add(action.getStatus()); } - return instanceCount; + return Collections.frequency(statuses, status); } public static Status getDefaultCoordinatorStatus(ColoHelper colohelper, String processName, int bundleNumber) throws OozieClientException { OozieClient oozieClient = colohelper.getProcessHelper().getOozieClient(); - String coordId = - getDefaultCoordinatorFromProcessName(colohelper, processName, bundleNumber); - return oozieClient.getCoordJobInfo(coordId).getStatus(); - } - - public static String getDefaultCoordinatorFromProcessName( - ColoHelper coloHelper, String processName, int bundleNumber) throws OozieClientException { String bundleID = - getSequenceBundleID(coloHelper, processName, EntityType.PROCESS, bundleNumber); - return getDefaultCoordIDFromBundle(coloHelper.getClusterHelper().getOozieClient(), bundleID); + getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber); + String coordId = getDefaultCoordIDFromBundle(oozieClient, bundleID); + return oozieClient.getCoordJobInfo(coordId).getStatus(); } /** * Retrieves all coordinators of bundle. * + * @param oozieClient Oozie client to use for fetching info. * @param bundleID specific bundle ID - * @param helper entity helper which is related to job * @return list of bundle coordinators * @throws OozieClientException */ - public static List<CoordinatorJob> getBundleCoordinators(String bundleID, - AbstractEntityHelper helper) - throws OozieClientException { - OozieClient localOozieClient = helper.getOozieClient(); - BundleJob bundleInfo = localOozieClient.getBundleJobInfo(bundleID); + public static List<CoordinatorJob> getBundleCoordinators(OozieClient oozieClient, + String bundleID) throws OozieClientException { + BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleID); return bundleInfo.getCoordinators(); } @@ -444,8 +366,8 @@ public final class InstanceUtil { public static String getLatestBundleID(ColoHelper coloHelper, String entityName, EntityType entityType) throws OozieClientException { - return getLatestBundleID(coloHelper.getFeedHelper().getOozieClient(), - entityName, entityType); + final OozieClient oozieClient = coloHelper.getFeedHelper().getOozieClient(); + return getLatestBundleID(oozieClient, entityName, entityType); } /** @@ -458,10 +380,8 @@ public final class InstanceUtil { * @throws OozieClientException */ public static String getLatestBundleID(OozieClient oozieClient, - String entityName, EntityType entityType) - throws OozieClientException { - List<String> bundleIds = OozieUtil.getBundles(oozieClient, - entityName, entityType); + String entityName, EntityType entityType) throws OozieClientException { + List<String> bundleIds = OozieUtil.getBundles(oozieClient, entityName, entityType); String max = "0"; int maxID = -1; for (String strID : bundleIds) { @@ -482,22 +402,6 @@ public final class InstanceUtil { * @return bundle ID * @throws OozieClientException */ - public static String getSequenceBundleID(ColoHelper prismHelper, String entityName, - EntityType entityType, int bundleNumber) - throws OozieClientException { - return getSequenceBundleID(prismHelper.getClusterHelper().getOozieClient(), entityName, - entityType, bundleNumber); - } - - /** - * Retrieves ID of bundle related to some process/feed using its ordinal number. - * - * @param entityName - name of entity bundle is related to - * @param entityType - feed or process - * @param bundleNumber - ordinal number of bundle - * @return bundle ID - * @throws OozieClientException - */ public static String getSequenceBundleID(OozieClient oozieClient, String entityName, EntityType entityType, int bundleNumber) throws OozieClientException { @@ -532,7 +436,7 @@ public final class InstanceUtil { * @param coloHelper - server from which instance status will be retrieved. * @param processName - name of process which mentioned instance belongs to. * @param bundleNumber - ordinal number of one of the bundle which are related to that - * process. + * process. * @param instanceNumber - ordinal number of instance which state will be returned. * @return - state of mentioned instance. * @throws OozieClientException @@ -541,17 +445,16 @@ public final class InstanceUtil { String processName, int bundleNumber, int instanceNumber) throws OozieClientException { - String bundleID = InstanceUtil - .getSequenceBundleID(coloHelper, processName, EntityType.PROCESS, bundleNumber); + final OozieClient oozieClient = coloHelper.getClusterHelper().getOozieClient(); + String bundleID = + getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber); if (StringUtils.isEmpty(bundleID)) { return null; } - String coordID = InstanceUtil.getDefaultCoordIDFromBundle(coloHelper.getClusterHelper().getOozieClient(), - bundleID); + String coordID = InstanceUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID); if (StringUtils.isEmpty(coordID)) { return null; } - OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient(); CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); if (coordInfo == null) { return null; @@ -568,10 +471,10 @@ public final class InstanceUtil { /** * Retrieves replication coordinatorID from bundle of coordinators. */ - public static List<String> getReplicationCoordID(String bundlID, - AbstractEntityHelper helper) + public static List<String> getReplicationCoordID(String bundleId, AbstractEntityHelper helper) throws OozieClientException { - List<CoordinatorJob> coords = InstanceUtil.getBundleCoordinators(bundlID, helper); + final OozieClient oozieClient = helper.getOozieClient(); + List<CoordinatorJob> coords = InstanceUtil.getBundleCoordinators(oozieClient, bundleId); List<String> replicationCoordID = new ArrayList<String>(); for (CoordinatorJob coord : coords) { if (coord.getAppName().contains("FEED_REPLICATION")) { @@ -607,9 +510,9 @@ public final class InstanceUtil { */ public static String getFeedPrefix(String feed) { FeedMerlin feedElement = new FeedMerlin(feed); - String p = feedElement.getLocations().getLocations().get(0).getPath(); - p = p.substring(0, p.indexOf('$')); - return p; + String locationPath = feedElement.getLocations().getLocations().get(0).getPath(); + locationPath = locationPath.substring(0, locationPath.indexOf('$')); + return locationPath; } /** @@ -634,8 +537,8 @@ public final class InstanceUtil { } public static org.apache.oozie.client.WorkflowJob.Status getInstanceStatusFromCoord( - ColoHelper ua1, String coordID, int instanceNumber) throws OozieClientException { - OozieClient oozieClient = ua1.getProcessHelper().getOozieClient(); + ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException { + OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient(); CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); String jobId = coordInfo.getActions().get(instanceNumber).getExternalId(); LOGGER.info("jobId = " + jobId); @@ -693,18 +596,18 @@ public final class InstanceUtil { LOGGER.info("feedName: " + feedName); int numberOfCoord = 0; - if (OozieUtil.getBundles(helper.getOozieClient(), feedName, EntityType.FEED).size() == 0) { + final OozieClient oozieClient = helper.getOozieClient(); + if (OozieUtil.getBundles(oozieClient, feedName, EntityType.FEED).size() == 0) { return 0; } - List<String> bundleID = - OozieUtil.getBundles(helper.getOozieClient(), feedName, EntityType.FEED); - LOGGER.info("bundleID: " + bundleID); + List<String> bundleIds = OozieUtil.getBundles(oozieClient, feedName, EntityType.FEED); + LOGGER.info("bundleIds: " + bundleIds); - for (String aBundleID : bundleID) { - LOGGER.info("aBundleID: " + aBundleID); - OozieUtil.waitForCoordinatorJobCreation(helper.getOozieClient(), aBundleID); + for (String aBundleId : bundleIds) { + LOGGER.info("aBundleId: " + aBundleId); + OozieUtil.waitForCoordinatorJobCreation(oozieClient, aBundleId); List<CoordinatorJob> coords = - InstanceUtil.getBundleCoordinators(aBundleID, helper); + InstanceUtil.getBundleCoordinators(oozieClient, aBundleId); LOGGER.info("coords: " + coords); for (CoordinatorJob coord : coords) { if (coord.getAppName().contains(coordType)) { @@ -720,12 +623,9 @@ public final class InstanceUtil { * * @return modified process definition */ - public static String setProcessFrequency(String process, - Frequency frequency) { + public static String setProcessFrequency(String process, Frequency frequency) { ProcessMerlin p = new ProcessMerlin(process); - p.setFrequency(frequency); - return p.toString(); } @@ -734,9 +634,7 @@ public final class InstanceUtil { */ public static String setProcessName(String process, String newName) { ProcessMerlin p = new ProcessMerlin(process); - p.setName(newName); - return p.toString(); } @@ -766,39 +664,32 @@ public final class InstanceUtil { throws OozieClientException { OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient(); List<CoordinatorAction> list = new ArrayList<CoordinatorAction>(); - LOGGER.info("bundle size for process is " - + - OozieUtil.getBundles(coloHelper.getFeedHelper().getOozieClient(), processName, - entityType).size()); - for (String bundleId : OozieUtil.getBundles(coloHelper.getFeedHelper().getOozieClient(), - processName, entityType)) { + final List<String> bundleIds = OozieUtil.getBundles(oozieClient, processName, entityType); + LOGGER.info("bundle size for process is " + bundleIds.size()); + for (String bundleId : bundleIds) { BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleId); - List<CoordinatorJob> coords = bundleInfo.getCoordinators(); - LOGGER.info("number of coords in bundle " + bundleId + "=" + coords.size()); - for (CoordinatorJob coord : coords) { + List<CoordinatorJob> coordJobs = bundleInfo.getCoordinators(); + LOGGER.info("number of coordJobs in bundle " + bundleId + "=" + coordJobs.size()); + for (CoordinatorJob coordJob : coordJobs) { List<CoordinatorAction> actions = - oozieClient.getCoordJobInfo(coord.getId()).getActions(); - LOGGER.info("number of actions in coordinator " + coord.getId() + " is " - + - actions.size()); + oozieClient.getCoordJobInfo(coordJob.getId()).getActions(); + LOGGER.info("number of actions in coordinator " + coordJob.getId() + " is " + + actions.size()); list.addAll(actions); } } - String coordId = getLatestCoordinatorID(coloHelper.getClusterHelper().getOozieClient(), processName, - entityType); + String coordId = getLatestCoordinatorID(oozieClient, processName, entityType); LOGGER.info("default coordID: " + coordId); return list; } public static String getOutputFolderForInstanceForReplication(ColoHelper coloHelper, - String coordID, - int instanceNumber) - throws OozieClientException { + String coordID, int instanceNumber) throws OozieClientException { OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient(); CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); - return InstanceUtil.getReplicatedFolderFromInstanceRunConf( - oozieClient.getJobInfo(coordInfo.getActions().get(instanceNumber).getExternalId()) - .getConf()); + final CoordinatorAction coordAction = coordInfo.getActions().get(instanceNumber); + final String actionConf = oozieClient.getJobInfo(coordAction.getExternalId()).getConf(); + return InstanceUtil.getReplicatedFolderFromInstanceRunConf(actionConf); } private static String getReplicatedFolderFromInstanceRunConf( @@ -815,9 +706,9 @@ public final class InstanceUtil { OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient(); CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); - return InstanceUtil.getReplicatedFolderBaseFromInstanceRunConf( - oozieClient.getJobInfo(coordInfo.getActions().get(instanceNumber).getExternalId()) - .getConf()); + final CoordinatorAction coordAction = coordInfo.getActions().get(instanceNumber); + final String actionConf = oozieClient.getJobInfo(coordAction.getExternalId()).getConf(); + return InstanceUtil.getReplicatedFolderBaseFromInstanceRunConf(actionConf); } private static String getReplicatedFolderBaseFromInstanceRunConf(String runConf) { @@ -833,7 +724,7 @@ public final class InstanceUtil { * * @param client oozie client to retrieve info about instances * @param entityName name of feed or process - * @param instancesNumber instance number for which we wait to reach the required status + * @param instancesNumber instance number for which we wait to reach the required status * @param expectedStatus expected status we are waiting for * @param entityType type of entity - feed or process expected * @param totalMinutesToWait time in minutes for which instance state should be polled @@ -1052,52 +943,29 @@ public final class InstanceUtil { * Use this method directly in unusual test cases where timeouts are different from trivial. * In other cases use waitTillInstancesAreCreated(ColoHelper,String,int) * - * @param coloHelper colo helper of cluster job is running on + * @param oozieClient oozie client of the cluster on which job is running * @param entity definition of entity which describes job * @param bundleSeqNo bundle number if update has happened. * @throws OozieClientException */ - public static void waitTillInstancesAreCreated(ColoHelper coloHelper, + public static void waitTillInstancesAreCreated(OozieClient oozieClient, String entity, int bundleSeqNo, int totalMinutesToWait ) throws OozieClientException { String entityName = Util.readEntityName(entity); EntityType type = Util.getEntityType(entity); - waitTillInstancesAreCreated(coloHelper, entityName, type, bundleSeqNo, totalMinutesToWait); - } - - /** - * Waits till instances of specific job will be created during specific time. - * Use this method directly in unusual test cases where timeouts are different from trivial. - * In other cases use waitTillInstancesAreCreated(ColoHelper,String,int) - * - * @param coloHelper colo helper of cluster job is running on - * @param entityName name of entity job is related to - * @param type type of entity - * @param bundleSeqNo bundle number if update has happened. - * @throws OozieClientException - */ - public static void waitTillInstancesAreCreated(ColoHelper coloHelper, - String entityName, - EntityType type, - int bundleSeqNo, - int totalMinutesToWait - ) throws OozieClientException { - String bundleID = getSequenceBundleID(coloHelper, entityName, type, - bundleSeqNo); - String coordID = getDefaultCoordIDFromBundle(coloHelper.getClusterHelper().getOozieClient(), bundleID); + String bundleID = getSequenceBundleID(oozieClient, entityName, + type, bundleSeqNo); + String coordID = getDefaultCoordIDFromBundle(oozieClient, bundleID); for (int sleepCount = 0; sleepCount < totalMinutesToWait; sleepCount++) { - CoordinatorJob coordInfo = coloHelper.getProcessHelper().getOozieClient() - .getCoordJobInfo(coordID); + CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); if (coordInfo.getActions().size() > 0) { break; } LOGGER.info("Coord " + coordInfo.getId() + " still doesn't have " - + - "instance created on oozie: " + coloHelper.getProcessHelper() - .getOozieClient().getOozieUrl()); + + "instance created on oozie: " + oozieClient.getOozieUrl()); TimeUtil.sleepSeconds(5); } } @@ -1116,22 +984,8 @@ public final class InstanceUtil { int bundleSeqNo ) throws OozieClientException { int sleep = INSTANCES_CREATED_TIMEOUT * 60 / 5; - waitTillInstancesAreCreated(coloHelper, entity, bundleSeqNo, sleep); - } - - public static String setFeedACL(String feed, String... ownerGroup) { - FeedMerlin feedObject = new FeedMerlin(feed); - ACL acl = feedObject.getACL(); - acl.setOwner(MerlinConstants.ACL_OWNER); - acl.setGroup(MerlinConstants.ACL_GROUP); - if (ownerGroup.length > 0) { - acl.setOwner(ownerGroup[0]); - if (ownerGroup.length == 2) { - acl.setGroup(ownerGroup[1]); - } - } - feedObject.setACL(acl); - return feedObject.toString(); + final OozieClient oozieClient = coloHelper.getClusterHelper().getOozieClient(); + waitTillInstancesAreCreated(oozieClient, entity, bundleSeqNo, sleep); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/98736484/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java index 3098729..d74864f 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java @@ -27,7 +27,6 @@ import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; import org.apache.oozie.client.Job; import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.client.XOozieClient; import org.joda.time.DateTime; import org.apache.log4j.Logger; import org.joda.time.DateTimeZone; @@ -204,7 +203,7 @@ public final class OozieUtil { throws OozieClientException { List<DateTime> startTimes = new ArrayList<DateTime>(); - XOozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient(); + OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient(); BundleJob bundleJob = oozieClient.getBundleJobInfo(bundleID); CoordinatorJob jobInfo; @@ -295,7 +294,7 @@ public final class OozieUtil { public static List<String> getWorkflowJobs(ColoHelper prismHelper, String bundleID) throws OozieClientException { - XOozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient(); + OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient(); waitForCoordinatorJobCreation(oozieClient, bundleID); List<String> workflowIds = new ArrayList<String>(); List<CoordinatorJob> coordJobs = oozieClient.getBundleJobInfo(bundleID).getCoordinators(); @@ -309,7 +308,7 @@ public final class OozieUtil { public static Date getNominalTime(ColoHelper prismHelper, String bundleID) throws OozieClientException { - XOozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient(); + OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient(); BundleJob bundleJob = oozieClient.getBundleJobInfo(bundleID); CoordinatorJob jobInfo = oozieClient.getCoordJobInfo(bundleJob.getCoordinators().get(0).getId()); @@ -322,7 +321,7 @@ public final class OozieUtil { public static CoordinatorJob getDefaultOozieCoord(ColoHelper prismHelper, String bundleId, EntityType type) throws OozieClientException { - XOozieClient client = prismHelper.getClusterHelper().getOozieClient(); + OozieClient client = prismHelper.getClusterHelper().getOozieClient(); BundleJob bundlejob = client.getBundleJobInfo(bundleId); for (CoordinatorJob coord : bundlejob.getCoordinators()) { @@ -368,7 +367,7 @@ public final class OozieUtil { public static boolean isBundleOver(ColoHelper coloHelper, String bundleId) throws OozieClientException { - XOozieClient client = coloHelper.getClusterHelper().getOozieClient(); + OozieClient client = coloHelper.getClusterHelper().getOozieClient(); BundleJob bundleJob = client.getBundleJobInfo(bundleId); @@ -440,7 +439,8 @@ public final class OozieUtil { public static String getCoordStartTime(ColoHelper colo, String entity, int bundleNo) throws OozieClientException { - String bundleID = InstanceUtil.getSequenceBundleID(colo, + final OozieClient oozieClient = colo.getClusterHelper().getOozieClient(); + String bundleID = InstanceUtil.getSequenceBundleID(oozieClient, Util.readEntityName(entity), Util.getEntityType(entity), bundleNo); CoordinatorJob coord = getDefaultOozieCoord(colo, bundleID, @@ -463,8 +463,9 @@ public final class OozieUtil { String entityName, int bundleNumber, int instanceNumber) throws OozieClientException, IOException { - String bundleID = InstanceUtil.getSequenceBundleID(helper, entityName, type, bundleNumber); - OozieClient oozieClient = helper.getClusterHelper().getOozieClient(); + final OozieClient oozieClient = helper.getClusterHelper().getOozieClient(); + String bundleID = InstanceUtil.getSequenceBundleID(oozieClient, entityName, + type, bundleNumber); List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleID).getCoordinators(); HadoopUtil.createHDFSFolders(helper, getMissingDependenciesForInstance(oozieClient, coords, instanceNumber)); @@ -486,7 +487,9 @@ public final class OozieUtil { public static void createMissingDependencies(ColoHelper helper, EntityType type, String entityName, int bundleNumber) throws OozieClientException, IOException { - String bundleID = InstanceUtil.getSequenceBundleID(helper, entityName, type, bundleNumber); + final OozieClient oozieClient = helper.getClusterHelper().getOozieClient(); + String bundleID = InstanceUtil.getSequenceBundleID(oozieClient, entityName, type, + bundleNumber); createMissingDependenciesForBundle(helper, bundleID); } http://git-wip-us.apache.org/repos/asf/falcon/blob/98736484/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java index 64347af..a45911f 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java @@ -152,7 +152,8 @@ public class ProcessInstanceKillsTest extends BaseTestClass { InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); //create data for first 5 instances, 6th should be non-materialized - String bundleId = InstanceUtil.getSequenceBundleID(cluster, processName, EntityType.PROCESS, 0); + String bundleId = InstanceUtil.getSequenceBundleID(clusterOC, processName, + EntityType.PROCESS, 0); for(CoordinatorJob c : clusterOC.getBundleJobInfo(bundleId).getCoordinators()) { List<CoordinatorAction> actions = clusterOC.getCoordJobInfo(c.getId()).getActions(); if (actions.size() == 6) { http://git-wip-us.apache.org/repos/asf/falcon/blob/98736484/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java index 08a88a3..adbfd2c 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java @@ -32,6 +32,7 @@ import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.AssertUtil; import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesResult.WorkflowStatus; import org.apache.hadoop.fs.FileSystem; @@ -245,7 +246,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass { TimeUtil.sleepSeconds(TIMEOUT); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, "?start=2010-01-02T01:00Z"); - InstanceUtil.validateSuccessOnlyStart(r, WorkflowStatus.SUSPENDED); + Assert.assertEquals(r.getStatus(), APIResult.Status.SUCCEEDED); + Assert.assertEquals(InstanceUtil.instancesInResultWithStatus(r, WorkflowStatus.SUSPENDED), 1); } /** http://git-wip-us.apache.org/repos/asf/falcon/blob/98736484/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java index 1f40e96..4bd91e4 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java @@ -79,6 +79,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { private ColoHelper cluster2 = servers.get(1); private ColoHelper cluster3 = servers.get(2); private FileSystem cluster1FS = serverFS.get(0); + private OozieClient cluster1OC = serverOC.get(0); private OozieClient cluster2OC = serverOC.get(1); private OozieClient cluster3OC = serverOC.get(2); private static final Logger LOGGER = Logger.getLogger(NewPrismProcessUpdateTest.class); @@ -131,7 +132,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { .getLatestBundleID(cluster3, Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, @@ -162,7 +163,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, false); waitingForBundleFinish(cluster3, oldBundleId, 5); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10); OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, true); @@ -180,7 +181,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); TimeUtil.sleepSeconds(30); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, @@ -359,7 +360,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded(response); OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, false); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10); String prismString = getResponse(prism, bundles[1].getProcessData(), true); Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(), @@ -492,7 +493,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); TimeUtil.sleepSeconds(30); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, @@ -582,7 +583,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS); AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, @@ -668,7 +669,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, @@ -769,7 +770,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); TimeUtil.sleepSeconds(30); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, @@ -837,7 +838,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); TimeUtil.sleepSeconds(30); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, @@ -911,7 +912,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); TimeUtil.sleepSeconds(30); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, @@ -939,7 +940,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { } OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, false); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10); bundles[1].verifyDependencyListing(cluster2); @@ -992,7 +993,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, false); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10); AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData())); @@ -1026,13 +1027,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(originalProcess)); TimeUtil.sleepSeconds(30); - InstanceUtil.waitTillInstancesAreCreated(cluster3, originalProcess, 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, originalProcess, 0, 10); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, Util.readEntityName(originalProcess), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(cluster3, originalProcess, 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, originalProcess, 0, 10); List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); @@ -1073,7 +1074,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { updatedProcess, true, false); waitingForBundleFinish(cluster3, oldBundleId); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10); OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, true); @@ -1088,7 +1089,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); TimeUtil.sleepSeconds(30); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, @@ -1218,7 +1219,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //now to schedule in 1 colo and let it remain in another AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, @@ -1240,7 +1241,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { ServiceResponse response = prism.getProcessHelper().update(updatedProcess, updatedProcess); AssertUtil.assertSucceeded(response); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 1, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10); String prismString = dualComparison(prism, cluster2, bundles[1].getProcessData()); Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(), @@ -1270,7 +1271,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); TimeUtil.sleepSeconds(30); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, @@ -1314,7 +1315,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); TimeUtil.sleepSeconds(30); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); String oldBundleId = InstanceUtil .getLatestBundleID(cluster3, @@ -1422,7 +1423,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() .getEnd() )); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); @@ -1460,7 +1461,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.getTimeWrtSystemTime(15)); b.submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster1, b.getProcessData(), 0, 10); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, b.getProcessData(), 0, 10); OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, b.getProcessName(), 0); InstanceUtil.waitTillInstanceReachState(serverOC.get(0),
