OOZIE-2594 finalize the impl of MapReduceActionExecutor.kill(), adding tests
Change-Id: I09dce58bbd3c7f4534210394e35f6681a62b9bc9 Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8d60f7f2 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8d60f7f2 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8d60f7f2 Branch: refs/heads/oya Commit: 8d60f7f25647ff0839e62d3e245d8c6f875c57b1 Parents: 095c584 Author: Peter Bacsko <[email protected]> Authored: Wed Nov 23 13:56:58 2016 +0100 Committer: Peter Bacsko <[email protected]> Committed: Wed Nov 23 14:56:02 2016 +0100 ---------------------------------------------------------------------- .../action/hadoop/MapReduceActionExecutor.java | 40 +++++++++++---- .../oozie/action/hadoop/BlockingMapper.java | 52 ++++++++++++++++++++ .../action/hadoop/MapperReducerForTest.java | 10 ++-- .../hadoop/TestMapReduceActionExecutor.java | 45 +++++++++++++++++ 4 files changed, 133 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/8d60f7f2/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java index e97de7e..11d1787 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java @@ -50,6 +50,11 @@ import org.apache.oozie.util.XmlUtils; import org.jdom.Element; import org.jdom.Namespace; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import com.google.common.io.Closeables; + public class MapReduceActionExecutor extends JavaActionExecutor { public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write"; @@ -401,16 +406,15 @@ public class MapReduceActionExecutor extends JavaActionExecutor { } @Override - public void kill(Context context, WorkflowAction action) throws ActionExecutorException { + public void kill(final Context context, final WorkflowAction action) throws ActionExecutorException { // Kill the LauncherAM which submits the MR job super.kill(context, action); // We have to check whether the MapReduce execution has started or not. If it has started, then we have to get // the YARN ApplicationID based on the tag and kill it as well - - // TODO: this must be tested in TestMapReduceActionExecutor + YarnClient yarnClient = null; try { - String tag = ActionExecutor.getActionYarnTag(new Configuration(), context.getWorkflow(), action); + String tag = LauncherMapperHelper.getTag(ActionExecutor.getActionYarnTag(new Configuration(), context.getWorkflow(), action)); GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); gar.setScope(ApplicationsRequestScope.ALL); gar.setApplicationTags(Collections.singleton(tag)); @@ -420,16 +424,34 @@ public class MapReduceActionExecutor extends JavaActionExecutor { GetApplicationsResponse apps = proxy.getApplications(gar); List<ApplicationReport> appsList = apps.getApplicationList(); - YarnClient yarnClient = YarnClient.createYarnClient(); - yarnClient.init(actionConf); - yarnClient.start(); + if (appsList.size() > 1) { + String applications = Joiner.on(",").join(Iterables.transform(appsList, new Function<ApplicationReport, String>() { + @Override + public String apply(ApplicationReport input) { + return input.toString(); + } + })); + + LOG.error("Too many applications were returned: {0}", applications); + throw new IllegalArgumentException("Too many applications were returned"); + } else if (appsList.size() == 1) { - for (ApplicationReport app : appsList) { - LOG.info("Killing MapReduce job {0}", app.getApplicationId().toString()); + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(actionConf); + yarnClient.start(); + + ApplicationReport app = appsList.get(0); + LOG.info("Killing MapReduce job {0}, YARN Id: {1}", action.getExternalChildIDs(), app.getApplicationId().toString()); yarnClient.killApplication(app.getApplicationId()); + } else { + LOG.info("No MapReduce job to kill"); } } catch (Exception e) { throw convertException(e); + } finally { + if (yarnClient != null) { + Closeables.closeQuietly(yarnClient); + } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/8d60f7f2/core/src/test/java/org/apache/oozie/action/hadoop/BlockingMapper.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/BlockingMapper.java b/core/src/test/java/org/apache/oozie/action/hadoop/BlockingMapper.java new file mode 100644 index 0000000..0f4dcd6 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/action/hadoop/BlockingMapper.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.io.IOException; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +// A mapper task that blocks forever +public class BlockingMapper implements Mapper<Object, Object, Object, Object> { + + @Override + public void configure(JobConf job) { + // nop + } + + @Override + public void close() throws IOException { + // nop + } + + @Override + public void map(Object key, Object value, OutputCollector<Object, Object> output, Reporter reporter) + throws IOException { + try { + synchronized (this) { + wait(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8d60f7f2/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java b/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java index 8f08ddd..75ac716 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerForTest.java @@ -30,7 +30,7 @@ import org.apache.hadoop.mapred.Reducer; import java.io.IOException; import java.util.Iterator; -public class MapperReducerForTest implements Mapper, Reducer { +public class MapperReducerForTest implements Mapper<Object, Object, Object, Object>, Reducer<Object, Object, Object, Object> { public static final String GROUP = "g"; public static final String NAME = "c"; /** @@ -66,14 +66,14 @@ public class MapperReducerForTest implements Mapper, Reducer { public void close() throws IOException { } - @SuppressWarnings("unchecked") - public void map(Object key, Object value, OutputCollector collector, Reporter reporter) throws IOException { + @Override + public void map(Object key, Object value, OutputCollector<Object, Object> collector, Reporter reporter) throws IOException { collector.collect(key, value); reporter.incrCounter(GROUP, NAME, 5l); } - @SuppressWarnings("unchecked") - public void reduce(Object key, Iterator values, OutputCollector collector, Reporter reporter) + @Override + public void reduce(Object key, Iterator<Object> values, OutputCollector<Object, Object> collector, Reporter reporter) throws IOException { while (values.hasNext()) { collector.collect(key, values.next()); http://git-wip-us.apache.org/repos/asf/oozie/blob/8d60f7f2/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java index a21b7c7..78936c4 100644 --- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java +++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java @@ -52,6 +52,7 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.streaming.StreamJob; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; @@ -494,6 +495,12 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(MapperReducerCredentialsForTest.hasCredentials(mrJob)); } + protected XConfiguration getSleepMapReduceConfig(String inputDir, String outputDir) { + XConfiguration conf = getMapReduceConfig(inputDir, outputDir); + conf.set("mapred.mapper.class", BlockingMapper.class.getName()); + return conf; + } + protected XConfiguration getMapReduceConfig(String inputDir, String outputDir) { XConfiguration conf = new XConfiguration(); conf.set("mapred.mapper.class", MapperReducerForTest.class.getName()); @@ -654,6 +661,44 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(errorProps.getProperty("exception.stacktrace").startsWith(OozieActionConfiguratorException.class.getName())); } + public void testMapReduceActionKill() throws Exception { + FileSystem fs = getFileSystem(); + + Path inputDir = new Path(getFsTestCaseDir(), "input"); + Path outputDir = new Path(getFsTestCaseDir(), "output"); + + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); + + String actionXml = "<map-reduce>" + "<job-tracker>" + getResourceManagerUri() + "</job-tracker>" + "<name-node>" + + getNameNodeUri() + "</name-node>" + + getSleepMapReduceConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>"; + + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + // wait until LauncherAM terminates - the MR job keeps running the background + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + + MapReduceActionExecutor mae = new MapReduceActionExecutor(); + mae.check(context, context.getAction()); // must be called so that externalChildIDs are read from HDFS + JobConf conf = mae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); + String user = conf.get("user.name"); + JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); + final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); + + mae.kill(context, context.getAction()); + + waitFor(10_000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return mrJob.isComplete(); + } + }); + assertEquals(JobStatus.State.KILLED, mrJob.getJobStatus().getState()); + } + public void testMapReduceWithCredentials() throws Exception { FileSystem fs = getFileSystem();
