http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java deleted file mode 100644 index b07e275..0000000 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java +++ /dev/null @@ -1,855 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression.core.util; - -import com.google.gson.GsonBuilder; -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonSyntaxException; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors; -import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper; -import org.apache.falcon.request.BaseRequest; -import org.apache.falcon.resource.APIResult; -import org.apache.falcon.resource.FeedInstanceResult; -import org.apache.falcon.resource.InstanceDependencyResult; -import org.apache.falcon.resource.InstancesResult; -import org.apache.falcon.resource.InstancesSummaryResult; -import org.apache.falcon.resource.SchedulableEntityInstance; -import org.apache.falcon.resource.TriageResult; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.authentication.client.AuthenticationException; -import org.apache.http.HttpResponse; -import org.apache.log4j.Logger; -import org.apache.oozie.client.BundleJob; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.client.Job.Status; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.OozieClientException; -import org.apache.oozie.client.WorkflowJob; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.json.JSONException; -import org.testng.Assert; - -import java.io.IOException; -import java.lang.reflect.Type; -import java.net.URISyntaxException; -import java.text.ParseException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * util functions related to instanceTest. - */ -public final class InstanceUtil { - - public static final int INSTANCES_CREATED_TIMEOUT = OSUtil.IS_WINDOWS ? 20 : 10; - private static final Logger LOGGER = Logger.getLogger(InstanceUtil.class); - private static final EnumSet<Status> LIVE_STATUS = EnumSet.of(Status.RUNNING, - Status.PREP, Status.SUCCEEDED, Status.SUSPENDED); - - private InstanceUtil() { - throw new AssertionError("Instantiating utility class..."); - } - - public static APIResult sendRequestProcessInstance(String url, String user) - throws IOException, URISyntaxException, AuthenticationException, InterruptedException { - return hitUrl(url, Util.getMethodType(url), user); - } - - public static APIResult hitUrl(String url, - String method, String user) throws URISyntaxException, - IOException, AuthenticationException, InterruptedException { - BaseRequest request = new BaseRequest(url, method, user); - HttpResponse response = request.run(); - String responseString = IOUtils.toString(response.getEntity().getContent(), "UTF-8"); - LOGGER.info("The web service response is:\n" + Util.prettyPrintXmlOrJson(responseString)); - APIResult result; - if (url.contains("/summary/")) { - result = new InstancesSummaryResult(APIResult.Status.FAILED, responseString); - }else if (url.contains("/listing/")) { - result = new FeedInstanceResult(APIResult.Status.FAILED, responseString); - }else if (url.contains("instance/dependencies")) { - result = new InstanceDependencyResult(APIResult.Status.FAILED, responseString); - }else if (url.contains("instance/triage")) { - result = new TriageResult(APIResult.Status.FAILED, responseString); - }else { - result = new InstancesResult(APIResult.Status.FAILED, responseString); - } - Assert.assertNotNull(result, "APIResult is null"); - for (ResponseErrors error : ResponseErrors.values()) { - if (responseString.contains(error.getError())) { - return result; - } - } - final String[] errorStrings = { - "(FEED) not found", - "is beforePROCESS start", - "is after end date", - "is after PROCESS's end", - "is before PROCESS's start", - "is before the entity was scheduled", - }; - for (String error : errorStrings) { - if (responseString.contains(error)) { - return result; - } - } - try { - result = new GsonBuilder().registerTypeAdapter(Date.class, new JsonDeserializer<Date>() { - @Override - public Date deserialize(JsonElement json, Type t, JsonDeserializationContext c) { - return new DateTime(json.getAsString()).toDate(); - } - }).create().fromJson(responseString, getClassOfResult(url)); - } catch (JsonSyntaxException e) { - Assert.fail("Not a valid json:\n" + responseString); - } - LOGGER.info("statusCode: " + response.getStatusLine().getStatusCode()); - LOGGER.info("message: " + result.getMessage()); - LOGGER.info("APIResult.Status: " + result.getStatus()); - return result; - } - - /** - * Returns API result class matching to API request url. - */ - private static Class<? extends APIResult> getClassOfResult(String url) { - final Class<? extends APIResult> classOfResult; - if (url.contains("/listing/")) { - classOfResult = FeedInstanceResult.class; - } else if (url.contains("/summary/")) { - classOfResult = InstancesSummaryResult.class; - } else if (url.contains("instance/dependencies")) { - classOfResult = InstanceDependencyResult.class; - } else if (url.contains("instance/triage")) { - classOfResult = TriageResult.class; - } else { - classOfResult = InstancesResult.class; - } - return classOfResult; - } - - /** - * Checks if API response reflects success and if it's instances match to expected status. - * - * @param instancesResult - kind of response from API which should contain information about - * instances - * @param bundle - bundle from which process instances are being analyzed - * @param wfStatus - - expected status of instances - */ - public static void validateSuccess(InstancesResult instancesResult, Bundle bundle, - InstancesResult.WorkflowStatus wfStatus) { - Assert.assertEquals(instancesResult.getStatus(), APIResult.Status.SUCCEEDED); - Assert.assertEquals(instancesInResultWithStatus(instancesResult, wfStatus), - bundle.getProcessConcurrency()); - } - - /** - * Check the number of instances in response which have the same status as expected. - * - * @param instancesResult kind of response from API which should contain information about - * instances - * @param workflowStatus expected status of instances - * @return number of instances which have expected status - */ - public static int instancesInResultWithStatus(InstancesResult instancesResult, - InstancesResult.WorkflowStatus workflowStatus) { - InstancesResult.Instance[] instances = instancesResult.getInstances(); - LOGGER.info("instances: " + Arrays.toString(instances)); - List<InstancesResult.WorkflowStatus> statuses = - new ArrayList<>(); - for (InstancesResult.Instance instance : instances) { - LOGGER.info("instance: " + instance + " status = " + instance.getStatus()); - statuses.add(instance.getStatus()); - } - return Collections.frequency(statuses, workflowStatus); - } - - /** - * Validates that response doesn't contains instances. - * @param r response - */ - public static void validateSuccessWOInstances(InstancesResult r) { - AssertUtil.assertSucceeded(r); - Assert.assertNull(r.getInstances(), "Unexpected :" + Arrays.toString(r.getInstances())); - } - - /** - * Validates that failed response contains specific error message. - * @param instancesResult response - * @param error expected error - */ - public static void validateError(InstancesResult instancesResult, ResponseErrors error) { - Assert.assertTrue(instancesResult.getMessage().contains(error.getError()), - "Error should contains '" + error.getError() + "'"); - } - - /** - * Checks that actual number of instances with different statuses are equal to expected number - * of instances with matching statuses. - * - * @param instancesResult kind of response from API which should contain information about - * instances <p/> - * All parameters below reflect number of expected instances with some - * kind of status. - * @param totalCount total number of instances. - * @param runningCount number of running instances. - * @param suspendedCount number of suspended instance. - * @param waitingCount number of waiting instance. - * @param killedCount number of killed instance. - */ - public static void validateResponse(InstancesResult instancesResult, int totalCount, - int runningCount, int suspendedCount, int waitingCount, int killedCount) { - InstancesResult.Instance[] instances = instancesResult.getInstances(); - LOGGER.info("instances: " + Arrays.toString(instances)); - Assert.assertNotNull(instances, "instances should be not null"); - Assert.assertEquals(instances.length, totalCount, "Total Instances"); - List<InstancesResult.WorkflowStatus> statuses = new ArrayList<>(); - for (InstancesResult.Instance instance : instances) { - final InstancesResult.WorkflowStatus status = instance.getStatus(); - LOGGER.info("status: " + status + ", instance: " + instance.getInstance()); - statuses.add(status); - } - Assert.assertEquals(Collections.frequency(statuses, InstancesResult.WorkflowStatus.RUNNING), - runningCount, "Running Instances"); - Assert.assertEquals(Collections.frequency(statuses, InstancesResult.WorkflowStatus.SUSPENDED), - suspendedCount, "Suspended Instances"); - Assert.assertEquals(Collections.frequency(statuses, InstancesResult.WorkflowStatus.WAITING), - waitingCount, "Waiting Instances"); - Assert.assertEquals(Collections.frequency(statuses, InstancesResult.WorkflowStatus.KILLED), - killedCount, "Killed Instances"); - } - - /** - * Retrieves workflow IDs from every instances from response. - * @param instancesResult response - * @return list of workflow IDs - */ - public static List<String> getWorkflowJobIds(InstancesResult instancesResult) { - InstancesResult.Instance[] instances = instancesResult.getInstances(); - LOGGER.info("Instances: " + Arrays.toString(instances)); - Assert.assertNotNull(instances, "Instances should be not null"); - List<String> wfIds = new ArrayList<>(); - for (InstancesResult.Instance instance : instances) { - LOGGER.warn(String.format( - "instance: %s, status: %s, logs : %s", instance, instance.getStatus(), instance.getLogFile())); - if (instance.getStatus().name().equals("RUNNING") || instance.getStatus().name().equals("SUCCEEDED")) { - wfIds.add(instance.getLogFile()); - } - if (instance.getStatus().name().equals("KILLED") || instance.getStatus().name().equals("WAITING")) { - Assert.assertNull(instance.getLogFile()); - } - } - return wfIds; - } - - /** - * Checks that expected number of failed instances matches actual number of failed ones. - * - * @param instancesResult kind of response from API which should contain information about - * instances. - * @param failCount number of instances which should be failed. - */ - public static void validateFailedInstances(InstancesResult instancesResult, int failCount) { - AssertUtil.assertSucceeded(instancesResult); - int counter = 0; - for (InstancesResult.Instance oneInstance : instancesResult.getInstances()) { - if (oneInstance.getStatus() == InstancesResult.WorkflowStatus.FAILED) { - counter++; - } - } - Assert.assertEquals(counter, failCount, "Actual number of failed instances does not " - + "match to expected number of failed instances."); - } - - /** - * Gets process workflows by given statuses. - * @param oozieClient oozie client of cluster where process is running - * @param processName process name - * @param statuses statuses workflows will be selected by - * @return list of matching workflows - * @throws OozieClientException - */ - public static List<String> getWorkflows(OozieClient oozieClient, String processName, - WorkflowJob.Status... statuses) throws OozieClientException { - String bundleID = OozieUtil.getBundles(oozieClient, processName, EntityType.PROCESS).get(0); - List<String> workflowJobIds = OozieUtil.getWorkflowJobs(oozieClient, bundleID); - - List<String> toBeReturned = new ArrayList<>(); - for (String jobId : workflowJobIds) { - WorkflowJob wfJob = oozieClient.getJobInfo(jobId); - LOGGER.info("wfJob.getId(): " + wfJob.getId() + " wfJob.getStartTime(): " - + wfJob.getStartTime() + "jobId: " + jobId + " wfJob.getStatus(): " + wfJob.getStatus()); - if (statuses.length == 0 || Arrays.asList(statuses).contains(wfJob.getStatus())) { - toBeReturned.add(jobId); - } - } - return toBeReturned; - } - - public static boolean isWorkflowRunning(OozieClient oozieClient, String workflowID) throws - OozieClientException { - WorkflowJob.Status status = oozieClient.getJobInfo(workflowID).getStatus(); - return status == WorkflowJob.Status.RUNNING; - } - - public static void areWorkflowsRunning(OozieClient oozieClient, List<String> workflowIds, - int totalWorkflows, int runningWorkflows, int killedWorkflows, - int succeededWorkflows) throws OozieClientException { - if (totalWorkflows != -1) { - Assert.assertEquals(workflowIds.size(), totalWorkflows); - } - final List<WorkflowJob.Status> statuses = new ArrayList<>(); - for (String wfId : workflowIds) { - final WorkflowJob.Status status = oozieClient.getJobInfo(wfId).getStatus(); - LOGGER.info("wfId: " + wfId + " status: " + status); - statuses.add(status); - } - if (runningWorkflows != -1) { - Assert.assertEquals(Collections.frequency(statuses, WorkflowJob.Status.RUNNING), - runningWorkflows, "Number of running jobs doesn't match."); - } - if (killedWorkflows != -1) { - Assert.assertEquals(Collections.frequency(statuses, WorkflowJob.Status.KILLED), - killedWorkflows, "Number of killed jobs doesn't match."); - } - if (succeededWorkflows != -1) { - Assert.assertEquals(Collections.frequency(statuses, WorkflowJob.Status.SUCCEEDED), - succeededWorkflows, "Number of succeeded jobs doesn't match."); - } - } - - public static List<CoordinatorAction> getProcessInstanceList(OozieClient oozieClient, - String processName, EntityType entityType) throws OozieClientException { - String coordId = OozieUtil.getLatestCoordinatorID(oozieClient, processName, entityType); - //String coordId = getDefaultCoordinatorFromProcessName(processName); - LOGGER.info("default coordID: " + coordId); - return oozieClient.getCoordJobInfo(coordId).getActions(); - } - - public static int getInstanceCountWithStatus(OozieClient oozieClient, String processName, - CoordinatorAction.Status status, EntityType entityType) throws OozieClientException { - List<CoordinatorAction> coordActions = getProcessInstanceList(oozieClient, processName, entityType); - List<CoordinatorAction.Status> statuses = new ArrayList<>(); - for (CoordinatorAction action : coordActions) { - statuses.add(action.getStatus()); - } - return Collections.frequency(statuses, status); - } - - /** - * Retrieves status of one instance. - * - * @param oozieClient - server from which instance status will be retrieved. - * @param processName - name of process which mentioned instance belongs to. - * @param bundleNumber - ordinal number of one of the bundle which are related to that - * process. - * @param instanceNumber - ordinal number of instance which state will be returned. - * @return - state of mentioned instance. - * @throws OozieClientException - */ - public static CoordinatorAction.Status getInstanceStatus(OozieClient oozieClient, String processName, - int bundleNumber, int instanceNumber) throws OozieClientException { - String bundleID = OozieUtil.getSequenceBundleID(oozieClient, processName, EntityType.PROCESS, bundleNumber); - if (StringUtils.isEmpty(bundleID)) { - return null; - } - String coordID = OozieUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID); - if (StringUtils.isEmpty(coordID)) { - return null; - } - CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); - if (coordInfo == null) { - return null; - } - LOGGER.info("coordInfo = " + coordInfo); - List<CoordinatorAction> actions = coordInfo.getActions(); - if (actions.size() == 0) { - return null; - } - LOGGER.info("actions = " + actions); - return actions.get(instanceNumber).getStatus(); - } - - /** - * Forms and sends process instance request based on url of action to be performed and it's - * parameters. - * - * @param colo - servers on which action should be performed - * @param user - whose credentials will be used for this action - * @return result from API - */ - public static APIResult createAndSendRequestProcessInstance(String url, String params, String colo, String user) - throws IOException, URISyntaxException, AuthenticationException, InterruptedException { - if (params != null && !colo.equals("")) { - url = url + params + "&" + colo.substring(1); - } else if (params != null) { - url = url + params; - } else { - url = url + colo; - } - return sendRequestProcessInstance(url, user); - } - - public static org.apache.oozie.client.WorkflowJob.Status getInstanceStatusFromCoord( - OozieClient oozieClient, String coordID, int instanceNumber) throws OozieClientException { - CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); - String jobId = coordInfo.getActions().get(instanceNumber).getExternalId(); - LOGGER.info("jobId = " + jobId); - if (jobId == null) { - return null; - } - WorkflowJob actionInfo = oozieClient.getJobInfo(jobId); - return actionInfo.getStatus(); - } - - public static List<String> getInputFoldersForInstanceForReplication( - OozieClient oozieClient, String coordID, int instanceNumber) throws OozieClientException { - CoordinatorAction x = oozieClient.getCoordActionInfo(coordID + "@" + instanceNumber); - String jobId = x.getExternalId(); - WorkflowJob wfJob = oozieClient.getJobInfo(jobId); - return getReplicationFolderFromInstanceRunConf(wfJob.getConf()); - } - - private static List<String> getReplicationFolderFromInstanceRunConf(String runConf) { - String conf; - conf = runConf.substring(runConf.indexOf("falconInPaths</name>") + 20); - conf = conf.substring(conf.indexOf("<value>") + 7); - conf = conf.substring(0, conf.indexOf("</value>")); - return new ArrayList<>(Arrays.asList(conf.split(","))); - } - - public static int getInstanceRunIdFromCoord(OozieClient oozieClient, String coordID, int instanceNumber) - throws OozieClientException { - CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); - WorkflowJob actionInfo = oozieClient.getJobInfo(coordInfo.getActions().get(instanceNumber).getExternalId()); - return actionInfo.getRun(); - } - - public static int checkIfFeedCoordExist(AbstractEntityHelper helper, - String feedName, String coordType) throws OozieClientException { - LOGGER.info("feedName: " + feedName); - int numberOfCoord = 0; - - final OozieClient oozieClient = helper.getOozieClient(); - if (OozieUtil.getBundles(oozieClient, feedName, EntityType.FEED).size() == 0) { - return 0; - } - List<String> bundleIds = OozieUtil.getBundles(oozieClient, feedName, EntityType.FEED); - LOGGER.info("bundleIds: " + bundleIds); - - for (String bundleId : bundleIds) { - LOGGER.info("bundleId: " + bundleId); - OozieUtil.waitForCoordinatorJobCreation(oozieClient, bundleId); - List<CoordinatorJob> coords = - OozieUtil.getBundleCoordinators(oozieClient, bundleId); - LOGGER.info("coords: " + coords); - for (CoordinatorJob coord : coords) { - if (coord.getAppName().contains(coordType)) { - numberOfCoord++; - } - } - } - return numberOfCoord; - } - - public static List<CoordinatorAction> getProcessInstanceListFromAllBundles( - OozieClient oozieClient, String processName, EntityType entityType) - throws OozieClientException { - List<CoordinatorAction> list = new ArrayList<>(); - final List<String> bundleIds = OozieUtil.getBundles(oozieClient, processName, entityType); - LOGGER.info("bundle size for process is " + bundleIds.size()); - for (String bundleId : bundleIds) { - BundleJob bundleInfo = oozieClient.getBundleJobInfo(bundleId); - List<CoordinatorJob> coordJobs = bundleInfo.getCoordinators(); - LOGGER.info("number of coordJobs in bundle " + bundleId + "=" + coordJobs.size()); - for (CoordinatorJob coordJob : coordJobs) { - List<CoordinatorAction> actions = - oozieClient.getCoordJobInfo(coordJob.getId()).getActions(); - LOGGER.info("number of actions in coordinator " + coordJob.getId() + " is " - + actions.size()); - list.addAll(actions); - } - } - String coordId = OozieUtil.getLatestCoordinatorID(oozieClient, processName, entityType); - LOGGER.info("default coordID: " + coordId); - return list; - } - - public static String getOutputFolderForInstanceForReplication(OozieClient oozieClient, - String coordID, int instanceNumber) throws OozieClientException { - CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); - final CoordinatorAction coordAction = coordInfo.getActions().get(instanceNumber); - final String actionConf = oozieClient.getJobInfo(coordAction.getExternalId()).getConf(); - return getReplicatedFolderFromInstanceRunConf(actionConf); - } - - private static String getReplicatedFolderFromInstanceRunConf(String runConf) { - String inputPathExample = getReplicationFolderFromInstanceRunConf(runConf).get(0); - String postFix = inputPathExample.substring(inputPathExample.length() - 7, inputPathExample.length()); - return getReplicatedFolderBaseFromInstanceRunConf(runConf) + postFix; - } - - public static String getOutputFolderBaseForInstanceForReplication( - OozieClient oozieClient, String coordID, int instanceNumber) throws OozieClientException { - CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); - final CoordinatorAction coordAction = coordInfo.getActions().get(instanceNumber); - final String actionConf = oozieClient.getJobInfo(coordAction.getExternalId()).getConf(); - return getReplicatedFolderBaseFromInstanceRunConf(actionConf); - } - - private static String getReplicatedFolderBaseFromInstanceRunConf(String runConf) { - String conf = runConf.substring(runConf.indexOf("distcpTargetPaths</name>") + 24); - conf = conf.substring(conf.indexOf("<value>") + 7); - conf = conf.substring(0, conf.indexOf("</value>")); - return conf; - } - - /** - * Waits till supplied number of instances of process/feed reach expected state during - * specific time. - * - * @param client oozie client to retrieve info about instances - * @param entityName name of feed or process - * @param instancesNumber instance number for which we wait to reach the required status - * @param expectedStatus expected status we are waiting for - * @param entityType type of entity - feed or process expected - * @param totalMinutesToWait time in minutes for which instance state should be polled - * @throws OozieClientException - */ - public static void waitTillInstanceReachState(OozieClient client, String entityName, int instancesNumber, - CoordinatorAction.Status expectedStatus, EntityType entityType, int totalMinutesToWait) - throws OozieClientException { - String filter; - // get the bundle ids - if (entityType.equals(EntityType.FEED)) { - filter = "name=FALCON_FEED_" + entityName; - } else { - filter = "name=FALCON_PROCESS_" + entityName; - } - List<BundleJob> bundleJobs = new ArrayList<>(); - for (int retries = 0; retries < 20; ++retries) { - bundleJobs = OozieUtil.getBundles(client, filter, 0, 10); - if (bundleJobs.size() > 0) { - break; - } - TimeUtil.sleepSeconds(5); - } - if (bundleJobs.size() == 0) { - Assert.fail("Could not retrieve bundles"); - } - List<String> bundleIds = OozieUtil.getBundleIds(bundleJobs); - Collections.sort(bundleIds, Collections.reverseOrder()); - String coordId = null; - for (String bundleId : bundleIds) { - LOGGER.info(String.format("Using bundle %s", bundleId)); - final Status status = client.getBundleJobInfo(bundleId).getStatus(); - Assert.assertTrue(LIVE_STATUS.contains(status), - String.format("Bundle job %s is should be prep/running but is %s", bundleId, status)); - OozieUtil.waitForCoordinatorJobCreation(client, bundleId); - List<CoordinatorJob> coords = client.getBundleJobInfo(bundleId).getCoordinators(); - List<String> cIds = new ArrayList<>(); - if (entityType == EntityType.PROCESS) { - for (CoordinatorJob coord : coords) { - cIds.add(coord.getId()); - } - coordId = OozieUtil.getMinId(cIds); - break; - } else { - for (CoordinatorJob coord : coords) { - if (coord.getAppName().contains("FEED_REPLICATION")) { - cIds.add(coord.getId()); - } - } - if (!cIds.isEmpty()) { - coordId = cIds.get(0); - break; - } - } - } - Assert.assertNotNull(coordId, "Coordinator id not found"); - LOGGER.info(String.format("Using coordinator id: %s", coordId)); - int maxTries = 50; - int totalSleepTime = totalMinutesToWait * 60; - int sleepTime = totalSleepTime / maxTries; - LOGGER.info(String.format("Sleep for %d seconds", sleepTime)); - for (int i = 0; i < maxTries; i++) { - LOGGER.info(String.format("Try %d of %d", (i + 1), maxTries)); - CoordinatorJob coordinatorJob = client.getCoordJobInfo(coordId); - final Status coordinatorStatus = coordinatorJob.getStatus(); - if (expectedStatus != CoordinatorAction.Status.TIMEDOUT){ - Assert.assertTrue(LIVE_STATUS.contains(coordinatorStatus), - String.format("Coordinator %s should be running/prep but is %s.", coordId, coordinatorStatus)); - } - List<CoordinatorAction> coordinatorActions = coordinatorJob.getActions(); - int instanceWithStatus = 0; - for (CoordinatorAction coordinatorAction : coordinatorActions) { - LOGGER.info(String.format("Coordinator Action %s status is %s on oozie %s", - coordinatorAction.getId(), coordinatorAction.getStatus(), client.getOozieUrl())); - if (expectedStatus == coordinatorAction.getStatus()) { - instanceWithStatus++; - } - } - if (instanceWithStatus >= instancesNumber) { - return; - } else { - TimeUtil.sleepSeconds(sleepTime); - } - } - Assert.fail("expected state of instance was never reached"); - } - - /** - * Waits till supplied number of instances of process/feed reach expected state during - * specific time. - * - * @param client oozie client to retrieve info about instances - * @param entityName name of feed or process - * @param numberOfInstance number of instances which status we are waiting for - * @param expectedStatus expected status we are waiting for - * @param entityType type of entity - feed or process expected - */ - public static void waitTillInstanceReachState(OozieClient client, String entityName, - int numberOfInstance, - CoordinatorAction.Status expectedStatus, - EntityType entityType) - throws OozieClientException { - int totalMinutesToWait = getMinutesToWait(entityType, expectedStatus); - waitTillInstanceReachState(client, entityName, numberOfInstance, expectedStatus, - entityType, totalMinutesToWait); - } - - /** - * Generates time which is presumably needed for process/feed instances to reach particular - * state. - * Feed instances are running faster then process, so feed timeouts are less then process. - * - * @param entityType type of entity which instances status we are waiting for - * @param expectedStatus expected status we are waiting for - * @return minutes to wait for expected status - */ - private static int getMinutesToWait(EntityType entityType, CoordinatorAction.Status expectedStatus) { - switch (expectedStatus) { - case RUNNING: - if (entityType == EntityType.PROCESS) { - return OSUtil.IS_WINDOWS ? 20 : 10; - } else if (entityType == EntityType.FEED) { - return OSUtil.IS_WINDOWS ? 10 : 5; - } - case WAITING: - return OSUtil.IS_WINDOWS ? 6 : 3; - case SUCCEEDED: - if (entityType == EntityType.PROCESS) { - return OSUtil.IS_WINDOWS ? 25 : 15; - } else if (entityType == EntityType.FEED) { - return OSUtil.IS_WINDOWS ? 20 : 10; - } - case KILLED: - case TIMEDOUT: - return OSUtil.IS_WINDOWS ? 40 : 20; - default: - return OSUtil.IS_WINDOWS ? 30 : 15; - } - } - - /** - * Waits till instances of specific job will be created during specific time. - * Use this method directly in unusual test cases where timeouts are different from trivial. - * In other cases use waitTillInstancesAreCreated(OozieClient,String,int) - * - * @param oozieClient oozie client of the cluster on which job is running - * @param entity definition of entity which describes job - * @param bundleSeqNo bundle number if update has happened. - * @throws OozieClientException - */ - public static void waitTillInstancesAreCreated(OozieClient oozieClient, String entity, int bundleSeqNo, - int totalMinutesToWait) throws OozieClientException { - String entityName = Util.readEntityName(entity); - EntityType type = Util.getEntityType(entity); - String bundleID = OozieUtil.getSequenceBundleID(oozieClient, entityName, - type, bundleSeqNo); - String coordID = OozieUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID); - for (int sleepCount = 0; sleepCount < totalMinutesToWait; sleepCount++) { - CoordinatorJob coordInfo = oozieClient.getCoordJobInfo(coordID); - - if (coordInfo.getActions().size() > 0) { - break; - } - LOGGER.info("Coord " + coordInfo.getId() + " still doesn't have " - + "instance created on oozie: " + oozieClient.getOozieUrl()); - TimeUtil.sleepSeconds(5); - } - } - - /** - * Waits till instances of specific job will be created during timeout. - * Timeout is common for most of usual test cases. - * - * @param oozieClient oozieClient of cluster job is running on - * @param entity definition of entity which describes job - * @param bundleSeqNo bundle number if update has happened. - * @throws OozieClientException - */ - public static void waitTillInstancesAreCreated(OozieClient oozieClient, String entity, int bundleSeqNo - ) throws OozieClientException { - int sleep = INSTANCES_CREATED_TIMEOUT * 60 / 5; - waitTillInstancesAreCreated(oozieClient, entity, bundleSeqNo, sleep); - } - - /** - * Asserts instances of specific job will be present for given instanceTime. - * - * @param instancesResult InstanceDependencyResult - * @param oozieClient oozieClient of cluster job is running on - * @param bundleID bundleId of job - * @param time instanceTime. - * @throws JSONException - * @throws ParseException - */ - public static void assertProcessInstances(InstanceDependencyResult instancesResult, OozieClient oozieClient, - String bundleID, String time) - throws OozieClientException, ParseException, JSONException { - List<String> inputPath = new ArrayList<>(); - List<String> outputPath = new ArrayList<>(); - SchedulableEntityInstance[] instances = instancesResult.getDependencies(); - LOGGER.info("instances: " + Arrays.toString(instances)); - Assert.assertNotNull(instances, "instances should be not null"); - for (SchedulableEntityInstance instance : instances) { - Assert.assertNotNull(instance.getCluster()); - Assert.assertNotNull(instance.getEntityName()); - Assert.assertNotNull(instance.getEntityType()); - Assert.assertNotNull(instance.getInstanceTime()); - Assert.assertNotNull(instance.getTags()); - if (instance.getTags().equals("Input")) { - inputPath.add(new DateTime(instance.getInstanceTime(), DateTimeZone.UTC).toString()); - } - if (instance.getTags().equals("Output")) { - outputPath.add(new DateTime(instance.getInstanceTime(), DateTimeZone.UTC).toString()); - } - } - - List<String> inputActual = getMinuteDatesToPath(inputPath.get(inputPath.indexOf( - Collections.min(inputPath))), inputPath.get(inputPath.indexOf(Collections.max(inputPath))), 5); - List<String> outputActual = getMinuteDatesToPath(outputPath.get(outputPath.indexOf(Collections.min( - outputPath))), outputPath.get(outputPath.indexOf(Collections.max(outputPath))), 5); - - Configuration conf = OozieUtil.getProcessConf(oozieClient, bundleID, time); - Assert.assertNotNull(conf, "Configuration should not be null"); - List<String> inputExp = Arrays.asList(conf.get("inputData").split(",")); - List<String> outputExp = Arrays.asList(conf.get("outputData").split(",")); - - Assert.assertTrue(matchList(inputExp, inputActual), " Inputs dont match"); - Assert.assertTrue(matchList(outputExp, outputActual), " Outputs dont match"); - - } - - /** - * Returns list of path based on given start and end time. - * - * @param startOozieDate start date - * @param endOozieDate end date - * @param minuteSkip difference between paths - * @throws ParseException - */ - public static List<String> getMinuteDatesToPath(String startOozieDate, String endOozieDate, - int minuteSkip) throws ParseException { - String myFormat = "yyyy'-'MM'-'dd'T'HH':'mm'Z'"; - String userFormat = "yyyy'-'MM'-'dd'T'HH':'mm':'ss'.'SSS'Z'"; - return TimeUtil.getMinuteDatesOnEitherSide(TimeUtil.parseDate(startOozieDate, myFormat, userFormat), - TimeUtil.parseDate(endOozieDate, myFormat, userFormat), minuteSkip); - } - - /** - * Parses date from one format to another. - * - * @param oozieDate input date - * @throws ParseException - */ - public static String getParsedDates(String oozieDate) throws ParseException { - String myFormat = "yyyy'-'MM'-'dd'T'HH':'mm'Z'"; - String userFormat = "yyyy'-'MM'-'dd'T'HH':'mm':'ss'.'SSS'Z'"; - return TimeUtil.parseDate(oozieDate, myFormat, userFormat); - } - - /** - * Asserts Whether two list are equal or not. - * - * @param firstList list<String> to be comapred - * @param secondList list<String> to be compared - */ - public static boolean matchList(List<String> firstList, List<String> secondList) { - Collections.sort(firstList); - Collections.sort(secondList); - if (firstList.size() != secondList.size()) { - return false; - } - for (int index = 0; index < firstList.size(); index++) { - if (!firstList.get(index).contains(secondList.get(index))) { - return false; - } - } - return true; - } - - /** - * Asserts instanceDependencyResult of specific job for a given feed. - * - * @param instancesResult InstanceDependencyResult - * @param processName process name for given bundle - * @param tag Input/Output - * @param expectedInstances instance for given instanceTime. - * @throws ParseException - */ - public static void assertFeedInstances(InstanceDependencyResult instancesResult, String processName, String tag, - List<String> expectedInstances) throws ParseException { - List<String> actualInstances = new ArrayList<>(); - SchedulableEntityInstance[] instances = instancesResult.getDependencies(); - LOGGER.info("instances: " + Arrays.toString(instances)); - Assert.assertNotNull(instances, "instances should be not null"); - for (SchedulableEntityInstance instance : instances) { - Assert.assertNotNull(instance.getCluster()); - Assert.assertNotNull(instance.getEntityName()); - Assert.assertNotNull(instance.getEntityType()); - Assert.assertNotNull(instance.getInstanceTime()); - Assert.assertNotNull(instance.getTags()); - Assert.assertTrue(instance.getEntityType().toString().equals("PROCESS"), "Type should be PROCESS"); - Assert.assertTrue(instance.getEntityName().equals(processName), "Expected name is : " + processName); - Assert.assertTrue(instance.getTags().equals(tag)); - actualInstances.add(getParsedDates(new DateTime(instance.getInstanceTime(), DateTimeZone.UTC).toString())); - } - - Set<String> expectedInstancesSet = new HashSet<>(expectedInstances); - Set<String> actualInstancesSet = new HashSet<>(actualInstances); - Assert.assertEquals(expectedInstancesSet, actualInstancesSet, "Instances don't match"); - } -} -
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/KerberosHelper.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/KerberosHelper.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/KerberosHelper.java deleted file mode 100644 index 9d028fa..0000000 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/KerberosHelper.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression.core.util; - -import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants; -import org.apache.hadoop.security.UserGroupInformation; - -import java.io.IOException; - -/** - * Util methods for Kerberos. - */ -public final class KerberosHelper { - private KerberosHelper() { - throw new AssertionError("Instantiating utility class..."); - } - - public static UserGroupInformation getUGI(String user) throws IOException { - // if unsecure cluster create a remote user object - if (!MerlinConstants.IS_SECURE) { - return UserGroupInformation.createRemoteUser(user); - } - // if secure create a ugi object from keytab - return UserGroupInformation.loginUserFromKeytabAndReturnUGI(getPrincipal(user), - getKeyTab(user)); - } - - private static String getKeyTab(String user) { - return MerlinConstants.getKeytabForUser(user); - } - - private static String getPrincipal(String user) { - return MerlinConstants.USER_REALM.isEmpty() ? user : user + '@' + MerlinConstants - .USER_REALM; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/LogUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/LogUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/LogUtil.java deleted file mode 100644 index e587704..0000000 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/LogUtil.java +++ /dev/null @@ -1,344 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression.core.util; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.log4j.Logger; -import org.apache.oozie.client.BundleJob; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.OozieClientException; -import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.client.WorkflowJob; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.PrintStream; -import java.util.Collection; -import java.util.List; - -/** - * Util class for managing logs. - */ -public final class LogUtil { - private static final Logger LOGGER = Logger.getLogger(LogUtil.class); - private static final String NL = System.getProperty("line.separator"); - private static final String HR = StringUtils.repeat("-", 80); - private static final String HR_2 = StringUtils.repeat("-", 120); - private static final String HR_3 = StringUtils.repeat("-", 160); - - private LogUtil() { - throw new AssertionError("Instantiating utility class..."); - } - - private enum OozieDump { - BundleDump { - @Override - void writeLogs(OozieClient oozieClient, String location, Collection<File> filter) { - final List<BundleJob> bundleJobsInfo; - try { - bundleJobsInfo = oozieClient.getBundleJobsInfo("", 0, 1000000); - } catch (OozieClientException e) { - LOGGER.error("Couldn't fetch list of bundles. Exception: " + e); - return; - } - for (BundleJob oneJobInfo : bundleJobsInfo) { - final String bundleJobId = oneJobInfo.getId(); - if (!skipInfo()) { - writeOneJobInfo(oozieClient, bundleJobId, location, filter); - } - if (!skipLog()) { - writeOneJobLog(oozieClient, bundleJobId, location, filter); - } - } - } - - /** - * Pull and dump info of one job. - * @param oozieClient oozie client that will be used for pulling log - * @param bundleJobId job id of the bundle job - * @param location local location where logs will be dumped - * @param filter list of files that have already been dumped - */ - private void writeOneJobInfo(OozieClient oozieClient, String bundleJobId, - String location, Collection<File> filter) { - final String fileName = OSUtil.concat(location, bundleJobId + "-info.log"); - final File file = new File(fileName); - if (filter != null && filter.contains(file)) { - return; - } - final BundleJob info; - try { - info = oozieClient.getBundleJobInfo(bundleJobId); - } catch (OozieClientException e) { - LOGGER.error("Couldn't fetch bundle info for " + bundleJobId + ". " - + "Exception: " + e); - return; - } - StringBuilder sb = new StringBuilder(); - sb.append("Bundle ID : ").append(info.getId()).append(NL); - sb.append(HR).append(NL); - sb.append("Bundle Name : ").append(info.getAppName()).append(NL); - sb.append("App Path : ").append(info.getAppPath()).append(NL); - sb.append("Status : ").append(info.getStatus()).append(NL); - sb.append("User : ").append(info.getUser()).append(NL); - sb.append("Created : ").append(info.getCreatedTime()).append(NL); - sb.append("Started : ").append(info.getStartTime()).append(NL); - sb.append("EndTime : ").append(info.getEndTime()).append(NL); - sb.append("Kickoff time : ").append(info.getKickoffTime()).append(NL); - sb.append(HR_2).append(NL); - final String format = "%-40s %-10s %-5s %-10s %-30s %-20s"; - sb.append(String.format(format, - "Job ID", "Status", "Freq", "Unit", "Started", "Next Materialized")).append(NL); - sb.append(HR_2).append(NL); - for (CoordinatorJob cj : info.getCoordinators()) { - sb.append(String.format(format, - cj.getId(), cj.getStatus(), cj.getFrequency(), cj.getTimeUnit(), cj.getStartTime(), - cj.getNextMaterializedTime())).append(NL); - } - sb.append(HR_2).append(NL); - try { - FileUtils.writeStringToFile(file, sb.toString()); - } catch (IOException e) { - LOGGER.error("Couldn't write bundle info for " + bundleJobId + ". " - + "Exception: " + e); - } - } - }, - - CoordDump { - @Override - void writeLogs(OozieClient oozieClient, String location, Collection<File> filter) { - final List<CoordinatorJob> coordJobsInfo; - try { - coordJobsInfo = oozieClient.getCoordJobsInfo("", 0, 1000000); - } catch (OozieClientException e) { - LOGGER.error("Couldn't fetch list of bundles. Exception: " + e); - return; - } - for (CoordinatorJob oneJobInfo : coordJobsInfo) { - final String coordJobId = oneJobInfo.getId(); - if (!skipInfo()) { - writeOneJobInfo(oozieClient, coordJobId, location, filter); - } - if (!skipLog()) { - writeOneJobLog(oozieClient, coordJobId, location, filter); - } - } - } - - /** - * Pull and dump info of one job. - * @param oozieClient oozie client that will be used for pulling log - * @param coordJobId job id of the coordinator job - * @param location local location where logs will be dumped - * @param filter list of files that have already been dumped - */ - private void writeOneJobInfo(OozieClient oozieClient, String coordJobId, - String location, Collection<File> filter) { - final String fileName = OSUtil.concat(location, coordJobId + "-info.log"); - final File file = new File(fileName); - if (filter != null && filter.contains(file)) { - return; - } - final CoordinatorJob info; - try { - info = oozieClient.getCoordJobInfo(coordJobId); - } catch (OozieClientException e) { - LOGGER.error("Couldn't fetch bundle info for " + coordJobId + ". " - + "Exception: " + e); - return; - } - StringBuilder sb = new StringBuilder(); - sb.append("Coordinator Job ID : ").append(info.getId()).append(NL); - sb.append(HR).append(NL); - sb.append("Job Name : ").append(info.getAppName()).append(NL); - sb.append("App Path : ").append(info.getAppPath()).append(NL); - sb.append("Status : ").append(info.getStatus()).append(NL); - sb.append("User : ").append(info.getUser()).append(NL); - sb.append("Started : ").append(info.getStartTime()).append(NL); - sb.append("EndTime : ").append(info.getEndTime()).append(NL); - sb.append(HR_3).append(NL); - final String format = "%-40s %-10s %-40s %-10s %-30s %-30s"; - sb.append(String.format(format, - "Job ID", "Status", "Ext ID", "Err Code", "Created", - "Nominal Time")).append(NL); - sb.append(HR_3).append(NL); - for (CoordinatorAction cj : info.getActions()) { - sb.append(String.format(format, - cj.getId(), cj.getStatus(), cj.getExternalId(), cj.getErrorCode(), - cj.getCreatedTime(), cj.getNominalTime())).append(NL); - } - sb.append(HR_3).append(NL); - try { - FileUtils.writeStringToFile(file, sb.toString()); - } catch (IOException e) { - LOGGER.error("Couldn't write coord job info for " + coordJobId + ". " - + "Exception: " + e); - } - } - }, - - WfDump { - @Override - void writeLogs(OozieClient oozieClient, String location, Collection<File> filter) { - final List<WorkflowJob> wfJobsInfo; - try { - wfJobsInfo = oozieClient.getJobsInfo("", 0, 1000000); - } catch (OozieClientException e) { - LOGGER.error("Couldn't fetch list of bundles. Exception: " + e); - return; - } - for (WorkflowJob oneJobInfo : wfJobsInfo) { - final String wfJobId = oneJobInfo.getId(); - if (!skipInfo()) { - writeOneJobInfo(oozieClient, wfJobId, location, filter); - } - if (!skipLog()) { - writeOneJobLog(oozieClient, wfJobId, location, filter); - } - } - } - - /** - * Pull and dump info of one job. - * @param oozieClient oozie client that will be used for pulling log - * @param wfJobId job id of the workflow job - * @param location local location where logs will be dumped - * @param filter list of files that have already been dumped - */ - private void writeOneJobInfo(OozieClient oozieClient, String wfJobId, - String location, Collection<File> filter) { - final String fileName = OSUtil.concat(location, wfJobId + "-info.log"); - final File file = new File(fileName); - if (filter != null && filter.contains(file)) { - return; - } - final WorkflowJob info; - try { - info = oozieClient.getJobInfo(wfJobId); - } catch (OozieClientException e) { - LOGGER.error("Couldn't fetch bundle info for " + wfJobId + ". Exception: " + e); - return; - } - StringBuilder sb = new StringBuilder(); - sb.append("Workflow Job ID : ").append(info.getId()).append(NL); - sb.append(HR).append(NL); - sb.append("Wf Name : ").append(info.getAppName()).append(NL); - sb.append("App Path : ").append(info.getAppPath()).append(NL); - sb.append("Status : ").append(info.getStatus()).append(NL); - sb.append("Run : ").append(info.getRun()).append(NL); - sb.append("User : ").append(info.getUser()).append(NL); - sb.append("Group : ").append(info.getAcl()).append(NL); - sb.append("Created : ").append(info.getCreatedTime()).append(NL); - sb.append("Started : ").append(info.getStartTime()).append(NL); - sb.append("Last Modified : ").append(info.getLastModifiedTime()).append(NL); - sb.append("EndTime : ").append(info.getEndTime()).append(NL); - sb.append("External ID : ").append(info.getExternalId()).append(NL); - sb.append(NL).append("Actions").append(NL); - sb.append(HR_3).append(NL); - final String format = "%-80s %-10s %-40s %-15s %-10s"; - sb.append(String.format(format, - "Job ID", "Status", "Ext ID", "Ext Status", "Err Code")).append(NL); - sb.append(HR_3).append(NL); - for (WorkflowAction cj : info.getActions()) { - sb.append(String.format(format, - cj.getId(), cj.getStatus(), cj.getExternalId(), cj.getExternalStatus(), - cj.getErrorCode())).append(NL); - } - sb.append(HR_3).append(NL); - try { - FileUtils.writeStringToFile(file, sb.toString()); - } catch (IOException e) { - LOGGER.error("Couldn't write wf job info for " + wfJobId + ". Exception: " + e); - } - } - }; - - private static boolean skipInfo() { - return Config.getBoolean("log.capture.oozie.skip_info", false); - } - - private static boolean skipLog() { - return Config.getBoolean("log.capture.oozie.skip_log", false); - } - - /** - * Pull and dump info and log of all jobs of a type. - * @param oozieClient oozie client that will be used for pulling log - * @param location local location where logs will be dumped - * @param filter list of files that have already been dumped - */ - abstract void writeLogs(OozieClient oozieClient, String location, Collection<File> filter); - - /** - * Pull and dump log of one job. - * @param oozieClient oozie client that will be used for pulling log - * @param jobId job id of the job - * @param location local location where logs will be dumped - * @param filter list of files that have already been dumped - */ - private static void writeOneJobLog(OozieClient oozieClient, String jobId, - String location, Collection<File> filter) { - final String fileName = OSUtil.concat(location, jobId + ".log"); - assert fileName != null; - final File file = new File(fileName); - if (filter != null && filter.contains(file)) { - return; - } - try { - oozieClient.getJobLog(jobId, "", "", new PrintStream(file)); - } catch (OozieClientException e) { - LOGGER.error("Couldn't fetch log for " + jobId + ". Exception: " + e); - } catch (FileNotFoundException e) { - LOGGER.error("Couldn't write log for " + jobId + ". Exception: " + e); - } - } - } - - /** - * Pulls and dumps oozie logs at a configured location. - * @param coloHelper coloHelper of the cluster from which oozie logs are going to be pulled - * @param logLocation local location at which logs are going to be dumped - */ - public static void writeOozieLogs(ColoHelper coloHelper, String logLocation) { - final OozieClient oozieClient = coloHelper.getFeedHelper().getOozieClient(); - final String hostname = coloHelper.getClusterHelper().getQaHost(); - final String oozieLogLocation = OSUtil.concat(logLocation, "oozie_logs", hostname); - assert oozieLogLocation != null; - final File directory = new File(oozieLogLocation); - if (!directory.exists()) { - try { - FileUtils.forceMkdir(directory); - } catch (IOException e) { - LOGGER.error("Directory creation failed for: " + directory + ". Exception: " + e); - return; - } - } - final Collection<File> filter = FileUtils.listFiles(directory, null, true); - for (OozieDump oozieDump : OozieDump.values()) { - oozieDump.writeLogs(oozieClient, oozieLogLocation, filter); - } - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/MatrixUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/MatrixUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/MatrixUtil.java deleted file mode 100644 index 14315b3..0000000 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/MatrixUtil.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression.core.util; - -import org.apache.commons.lang.ArrayUtils; -import org.testng.Assert; - -import java.util.Arrays; - -/** - * Util class for matrix operations. - */ -public final class MatrixUtil { - private MatrixUtil() { - throw new AssertionError("Instantiating utility class..."); - } - - /** - * Cross product many arrays. - * @param firstArray first array that you want to cross product - * @param otherArrays other arrays that you want to cross product - * @return cross product - */ - public static Object[][] crossProduct(Object[] firstArray, Object[]... otherArrays) { - if (otherArrays == null || otherArrays.length == 0) { - Object[][] result = new Object[firstArray.length][1]; - for (int i = 0; i < firstArray.length; ++i) { - result[i][0] = firstArray[i]; - } - return result; - } - // computing cross product for the rest of the arrays - Object[][] restArray = new Object[otherArrays.length-1][]; - System.arraycopy(otherArrays, 1, restArray, 0, otherArrays.length - 1); - Object[][] restCrossProduct = crossProduct(otherArrays[0], restArray); - //creating and initializing result array - Object[][] result = new Object[firstArray.length * restCrossProduct.length][]; - for(int i = 0; i < result.length; ++i) { - result[i] = new Object[otherArrays.length + 1]; - } - //doing the final cross product - for (int i = 0; i < firstArray.length; ++i) { - for (int j = 0; j < restCrossProduct.length; ++j) { - //computing one row of result - final int rowIdx = i * restCrossProduct.length + j; - result[rowIdx][0] = firstArray[i]; - System.arraycopy(restCrossProduct[j], 0, result[rowIdx], 1, otherArrays.length); - } - } - return result; - } - - public static Object[][] append(Object[][] arr1, Object[][] arr2) { - Assert.assertFalse(ArrayUtils.isEmpty(arr1), "arr1 can't be empty:" - + Arrays.deepToString(arr1)); - Assert.assertFalse(ArrayUtils.isEmpty(arr2), "arr2 can't be empty:" - + Arrays.deepToString(arr2)); - Assert.assertEquals(arr1[0].length, arr2[0].length, "Array rows are not compatible. " - + "row of first array: " + Arrays.deepToString(arr1[0]) - + "row of second array: " + Arrays.deepToString(arr2[0])); - return (Object[][]) ArrayUtils.addAll(arr1, arr2); - } - - /** - * Cross product many arrays. - * @param firstArray first array that you want to cross product - * @param otherArrays other arrays that you want to cross product - * @return cross product - */ - public static Object[][] crossProductNew(Object[] firstArray, Object[][]... otherArrays) { - return crossProduct(firstArray, otherArrays); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java deleted file mode 100644 index 19f5f57..0000000 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression.core.util; - -import org.apache.commons.io.FilenameUtils; - -/** - * Util methods related to OS. - */ -public final class OSUtil { - private OSUtil() { - throw new AssertionError("Instantiating utility class..."); - } - - public static final boolean IS_WINDOWS = System.getProperty("os.name").toLowerCase().startsWith("windows"); - public static final String WIN_SU_BINARY = - Config.getProperty("windows.su.binary", "ExecuteAs.exe"); - - private static final String SEPARATOR = System.getProperty("file.separator", "/"); - - public static final String RESOURCES = concat("src", "test", "resources"); - public static final String RESOURCES_OOZIE = concat(RESOURCES, "oozie"); - public static final String OOZIE_EXAMPLE_INPUT_DATA = concat(RESOURCES, "OozieExampleInputData"); - public static final String NORMAL_INPUT = concat(OOZIE_EXAMPLE_INPUT_DATA, "normalInput"); - public static final String SINGLE_FILE = concat(OOZIE_EXAMPLE_INPUT_DATA, "SingleFile"); - public static final String OOZIE_COMBINED_ACTIONS = concat(RESOURCES, "combinedWorkflow"); - - public static final String OOZIE_LIB_FOLDER = concat(RESOURCES, "oozieLib"); - public static final String MULTIPLE_ACTION_WORKFLOW = concat(RESOURCES, "MultipleActionWorkflow"); - public static final String PIG_DIR = concat(RESOURCES, "pig"); - - - public static String concat(String path1, String path2, String... pathParts) { - String path = FilenameUtils.concat(path1, path2); - for (String pathPart : pathParts) { - path = FilenameUtils.concat(path, pathPart); - } - return path; - } -}
