OOZIE-2591 fix TestWorkflowActionKillXCommand and refactor TestJavaActionExecutor
Change-Id: I556684dee7a04a931e6cf1b33de563b7ba4828b2 Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/61f3a9f0 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/61f3a9f0 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/61f3a9f0 Branch: refs/heads/oya Commit: 61f3a9f083b5085bbc575d7e9d251aec03bfcae4 Parents: 9e2acd0 Author: Peter Bacsko <[email protected]> Authored: Thu Nov 10 12:27:08 2016 +0100 Committer: Peter Bacsko <[email protected]> Committed: Thu Nov 10 16:06:34 2016 +0100 ---------------------------------------------------------------------- .../action/hadoop/TestJavaActionExecutor.java | 33 +++++++--------- .../wf/TestWorkflowActionKillXCommand.java | 37 +++++++++--------- .../java/org/apache/oozie/test/XTestCase.java | 40 +++++++------------- .../oozie/action/hadoop/LauncherMain.java | 9 ++++- 4 files changed, 54 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/61f3a9f0/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index 5d8bf34..bfc8ab4 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -27,7 +27,6 @@ import java.io.OutputStreamWriter; import java.io.StringReader; import java.io.Writer; import java.net.URI; -import java.security.PrivilegedExceptionAction; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Collections; @@ -35,10 +34,8 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Properties; -import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -63,7 +60,6 @@ import org.apache.oozie.service.ShareLibService; import org.apache.oozie.service.UUIDService; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.service.WorkflowStoreService; -import org.apache.oozie.service.UserGroupInformationService; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XmlUtils; @@ -100,7 +96,6 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { } - @SuppressWarnings("unchecked") public void testSetupMethods() throws Exception { JavaActionExecutor ae = new JavaActionExecutor(); assertEquals(Arrays.asList(JavaMain.class), ae.getLauncherClasses()); @@ -365,7 +360,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { "</java>"; Context context = createContext(actionXml, null); submitAction(context); - waitUntilYarnAppState(context.getAction().getExternalId(), YarnApplicationState.FINISHED); + waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId()); ActionExecutor ae = new JavaActionExecutor(); ae.check(context, context.getAction()); assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); @@ -385,7 +380,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { "</java>"; Context context = createContext(actionXml, null); final String runningJob = submitAction(context); - waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED); + waitUntilYarnAppDoneAndAssertSuccess(runningJob); ActionExecutor ae = new JavaActionExecutor(); ae.check(context, context.getAction()); assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); @@ -410,7 +405,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { "</java>"; Context context = createContext(actionXml, null); final String runningJob = submitAction(context); - waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED); + waitUntilYarnAppDoneAndAssertSuccess(runningJob); ActionExecutor ae = new JavaActionExecutor(); try { ae.check(context, context.getAction()); @@ -441,7 +436,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { final String runningJobId = submitAction(context); ActionExecutor ae = new JavaActionExecutor(); assertFalse(ae.isCompleted(context.getAction().getExternalStatus())); - waitUntilYarnAppState(runningJobId, YarnApplicationState.FINISHED); + waitUntilYarnAppDoneAndAssertSuccess(runningJobId); ae.check(context, context.getAction()); assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); assertNull(context.getAction().getData()); @@ -460,7 +455,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { Context context = createContext(actionXml, null); final String runningJob = submitAction(context); - waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED); + waitUntilYarnAppDoneAndAssertSuccess(runningJob); ActionExecutor ae = new JavaActionExecutor(); ae.check(context, context.getAction()); assertTrue(ae.isCompleted(context.getAction().getExternalStatus())); @@ -481,7 +476,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { Context context = createContext(actionXml, null); final String runningJob = submitAction(context); - waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED); + waitUntilYarnAppDoneAndAssertSuccess(runningJob); //FIXME assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob)); ActionExecutor ae = new JavaActionExecutor(); ae.check(context, context.getAction()); @@ -504,7 +499,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { Context context = createContext(actionXml, null); final String runningJob = submitAction(context); - waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED); + waitUntilYarnAppDoneAndAssertSuccess(runningJob); //FIXME assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob)); ActionExecutor ae = new JavaActionExecutor(); ae.check(context, context.getAction()); @@ -526,7 +521,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { Context context = createContext(actionXml, null); final String runningJob = submitAction(context); - waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED); + waitUntilYarnAppDoneAndAssertSuccess(runningJob); //FIXME assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob)); ActionExecutor ae = new JavaActionExecutor(); ae.check(context, context.getAction()); @@ -551,7 +546,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { assertEquals(WorkflowAction.Status.DONE, context.getAction().getStatus()); assertEquals("KILLED", context.getAction().getExternalStatus()); assertTrue(ae.isCompleted(context.getAction().getExternalStatus())); - waitUntilYarnAppState(runningJob, YarnApplicationState.KILLED); + waitUntilYarnAppKilledAndAssertSuccess(runningJob); } @@ -827,7 +822,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { "</java>"; Context context = createContext(actionXml, null); final String runningJob = submitAction(context); - waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED); + waitUntilYarnAppDoneAndAssertSuccess(runningJob); ActionExecutor ae = new JavaActionExecutor(); ae.check(context, context.getAction()); assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); @@ -1876,13 +1871,13 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { // Test when server side setting is not enabled JobConf launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, actionConf); - assertEquals("true", launcherConf.get(JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED)); + assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED)); // disabled by default ConfigurationService.set("oozie.action.launcher." + JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED, "true"); // Test when server side setting is enabled but tez-site.xml is not in DistributedCache launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, actionConf); - assertEquals("true", launcherConf.get(JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED)); + assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED)); final Path tezSite = new Path("/tmp/tez-site.xml"); final FSDataOutputStream out = getFileSystem().create(tezSite); @@ -2202,7 +2197,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { ConfigurationService.set("oozie.action.sharelib.for.java", "java"); final String runningJob = submitAction(context); - waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED); + waitUntilYarnAppDoneAndAssertSuccess(runningJob); } public void testJobSubmissionWithoutYarnKill() throws Exception { @@ -2236,7 +2231,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { ConfigurationService.setBoolean(JavaActionExecutor.HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART, false); final String runningJob = submitAction(context, ae); - waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED); + waitUntilYarnAppDoneAndAssertSuccess(runningJob); } public void testDefaultConfigurationInLauncher() throws Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/61f3a9f0/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java index 72f0114..71b46d1 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java @@ -18,19 +18,18 @@ package org.apache.oozie.command.wf; -import java.io.StringReader; import java.net.URI; import java.util.Date; +import java.util.Set; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.JobStatus; -import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.action.hadoop.LauncherMain; import org.apache.oozie.action.hadoop.MapperReducerForTest; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; @@ -42,11 +41,10 @@ import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; import org.apache.oozie.service.UUIDService; import org.apache.oozie.test.XDataTestCase; -import org.apache.oozie.test.XTestCase.Predicate; -import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.util.XmlUtils; import org.apache.oozie.workflow.WorkflowInstance; +import com.google.common.collect.Sets; + public class TestWorkflowActionKillXCommand extends XDataTestCase { private Services services; @@ -117,6 +115,7 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase { assertEquals(action.getExternalStatus(), "RUNNING"); } + // FIXME - fix JAE.kill() public void testWfActionKillChildJob() throws Exception { String externalJobID = launchSleepJob(1000); String childId = launchSleepJob(1000000); @@ -126,15 +125,8 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase { WorkflowAction.Status.KILLED, childId); new ActionKillXCommand(action.getId()).call(); - JobClient jobClient = createJobClient(); - final RunningJob mrJob = jobClient.getJob(JobID.forName(childId)); - waitFor(60 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return mrJob.isComplete(); - } - }); - assertEquals(mrJob.getJobState(), JobStatus.KILLED); + waitUntilYarnAppKilledAndAssertSuccess(childId); } protected WorkflowActionBean addRecordToWfActionTable(String wfId, String externalJobID, String actionName, @@ -189,9 +181,18 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase { SleepJob sleepjob = new SleepJob(); sleepjob.setConf(jobConf); jobConf = sleepjob.setupJobConf(1, 1, sleep, 1, sleep, 1); + jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, "sleepjob"); + jobConf.set(LauncherMain.MAPREDUCE_JOB_TAGS, "sleepjob"); + System.setProperty(LauncherMain.OOZIE_JOB_LAUNCH_TIME, String.valueOf(System.currentTimeMillis())); + + jobClient.submitJob(jobConf); + Set<ApplicationId> apps = Sets.newHashSet(); + apps = LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL); + assertEquals("Number of YARN apps", apps.size(), 1); + + sleepjob.close(); - final RunningJob runningJob = jobClient.submitJob(jobConf); - return runningJob.getID().toString(); + return apps.iterator().next().toString(); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/61f3a9f0/core/src/test/java/org/apache/oozie/test/XTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java index f0c79b6..fd6d4ad 100644 --- a/core/src/test/java/org/apache/oozie/test/XTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java @@ -35,7 +35,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -44,10 +43,8 @@ import javax.persistence.FlushModeType; import javax.persistence.Query; import junit.framework.TestCase; -import net.sf.ehcache.store.compound.ImmutableValueElementCopyStrategy; import org.apache.commons.io.FilenameUtils; -import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -92,7 +89,6 @@ import org.apache.oozie.sla.SLARegistrationBean; import org.apache.oozie.sla.SLASummaryBean; import org.apache.oozie.store.StoreException; import org.apache.oozie.test.MiniHCatServer.RUNMODE; -import org.apache.oozie.test.XTestCase.Predicate; import org.apache.oozie.test.hive.MiniHS2; import org.apache.oozie.util.ClasspathUtils; import org.apache.oozie.util.IOUtils; @@ -100,11 +96,6 @@ import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XLog; -import com.google.common.base.Enums; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - /** * Base JUnit <code>TestCase</code> subclass used by all Oozie testcases. * <p/> @@ -122,6 +113,7 @@ import com.google.common.collect.Sets; * From within testcases, system properties must be changed using the {@link #setSystemProperty} method. */ public abstract class XTestCase extends TestCase { + private static EnumSet<YarnApplicationState> YARN_TERMINAL_STATES = EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED, YarnApplicationState.FINISHED); private Map<String, String> sysProps; private String testCaseDir; private String testCaseConfDir; @@ -1235,48 +1227,44 @@ public abstract class XTestCase extends TestCase { return services; } - protected YarnApplicationState waitUntilYarnAppState(String externalId, final YarnApplicationState... acceptedStates) + protected YarnApplicationState waitUntilYarnAppState(String externalId, final EnumSet<YarnApplicationState> acceptedStates) throws HadoopAccessorException, IOException, YarnException { final ApplicationId appId = ConverterUtils.toApplicationId(externalId); - final Set<YarnApplicationState> states = Sets.immutableEnumSet(Lists.newArrayList(acceptedStates)); - final MutableBoolean endStateOK = new MutableBoolean(false); final MutableObject<YarnApplicationState> finalState = new MutableObject<YarnApplicationState>(); JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri()); - // This is needed here because we need a mutable final YarnClient - final MutableObject<YarnClient> yarnClientMO = new MutableObject<YarnClient>(null); + final YarnClient yarnClient = Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), jobConf); + try { - yarnClientMO.setValue(Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), jobConf)); waitFor(60 * 1000, new Predicate() { @Override public boolean evaluate() throws Exception { - YarnApplicationState state = yarnClientMO.getValue().getApplicationReport(appId).getYarnApplicationState(); + YarnApplicationState state = yarnClient.getApplicationReport(appId).getYarnApplicationState(); finalState.setValue(state); - if (states.contains(state)) { - endStateOK.setValue(true); - return true; - } else { - return false; - } + return acceptedStates.contains(state); } }); } finally { - if (yarnClientMO.getValue() != null) { - yarnClientMO.getValue().close(); + if (yarnClient != null) { + yarnClient.close(); } } log.info("Final state is: {0}", finalState.getValue()); - assertTrue(endStateOK.isTrue()); return finalState.getValue(); } protected void waitUntilYarnAppDoneAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException { - YarnApplicationState state = waitUntilYarnAppState(externalId, YarnApplicationState.FAILED, YarnApplicationState.KILLED, YarnApplicationState.FINISHED); + YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES); assertEquals("YARN App state", YarnApplicationState.FINISHED, state); } + protected void waitUntilYarnAppKilledAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException { + YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES); + assertEquals("YARN App state", YarnApplicationState.KILLED, state); + } + protected YarnApplicationState getYarnApplicationState(String externalId) throws HadoopAccessorException, IOException, YarnException { final ApplicationId appId = ConverterUtils.toApplicationId(externalId); YarnApplicationState state = null; http://git-wip-us.apache.org/repos/asf/oozie/blob/61f3a9f0/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java index 338bce8..31200af 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java @@ -125,7 +125,11 @@ public abstract class LauncherMain { } } - private static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) { + public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) { + return getChildYarnJobs(actionConf, ApplicationsRequestScope.OWN); + } + + public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf, ApplicationsRequestScope scope) { System.out.println("Fetching child yarn jobs"); Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>(); String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS); @@ -142,8 +146,9 @@ public abstract class LauncherMain { } GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); - gar.setScope(ApplicationsRequestScope.OWN); + gar.setScope(scope); gar.setApplicationTags(Collections.singleton(tag)); + long endTime = System.currentTimeMillis(); if (startTime > endTime) { System.out.println("WARNING: Clock skew between the Oozie server host and this host detected. Please fix this. " +
