Repository: oozie Updated Branches: refs/heads/master 3badb2d0b -> 4bd777e5e
OOZIE-1728 When an ApplicationMaster restarts, it restarts the launcher job: DistCp followup (ryota) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/4bd777e5 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/4bd777e5 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/4bd777e5 Branch: refs/heads/master Commit: 4bd777e5e45ce097aa07fbc1385623139d102069 Parents: 3badb2d Author: egashira <[email protected]> Authored: Wed Oct 15 15:33:33 2014 -0700 Committer: egashira <[email protected]> Committed: Wed Oct 15 15:33:33 2014 -0700 ---------------------------------------------------------------------- core/pom.xml | 6 + .../action/hadoop/DistcpActionExecutor.java | 21 ++- .../action/hadoop/TestDistCpActionExecutor.java | 157 ++++++++++++++++++ .../oozie/action/hadoop/TestDistcpMain.java | 73 +++++++++ release-log.txt | 3 +- sharelib/distcp/pom.xml | 11 -- .../apache/oozie/action/hadoop/DistcpMain.java | 97 ++++++++++++ .../action/hadoop/TestDistCpActionExecutor.java | 158 ------------------- 8 files changed, 352 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index 7cd1f70..597775c 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -260,6 +260,12 @@ <scope>compile</scope> </dependency> + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-sharelib-distcp</artifactId> + <scope>compile</scope> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java index 86d21fb..4d2f7b2 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java @@ -18,6 +18,7 @@ package org.apache.oozie.action.hadoop; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -27,9 +28,9 @@ import org.apache.oozie.service.Services; import org.apache.oozie.util.XLog; import org.jdom.Element; - public class DistcpActionExecutor extends JavaActionExecutor{ - public static final String CONF_OOZIE_DISTCP_ACTION_MAIN_CLASS = "org.apache.hadoop.tools.DistCp"; + public static final String CONF_OOZIE_DISTCP_ACTION_MAIN_CLASS = "org.apache.oozie.action.hadoop.DistcpMain"; + private static final String DISTCP_MAIN_CLASS_NAME = "org.apache.hadoop.tools.DistCp"; public static final String CLASS_NAMES = "oozie.actions.main.classnames"; private static final XLog LOG = XLog.getLog(DistcpActionExecutor.class); public static final String DISTCP_TYPE = "distcp"; @@ -47,13 +48,20 @@ public class DistcpActionExecutor extends JavaActionExecutor{ if(name != null){ classNameDistcp = name; } - actionConf.set(JavaMain.JAVA_MAIN_CLASS, classNameDistcp); + actionConf.set(JavaMain.JAVA_MAIN_CLASS, DISTCP_MAIN_CLASS_NAME); return actionConf; } @Override public List<Class> getLauncherClasses() { - return super.getLauncherClasses(); + List<Class> classes = new ArrayList<Class>(); + try { + classes.add(Class.forName(CONF_OOZIE_DISTCP_ACTION_MAIN_CLASS)); + } + catch (ClassNotFoundException e) { + throw new RuntimeException("Class not found", e); + } + return classes; } /** @@ -106,4 +114,9 @@ public class DistcpActionExecutor extends JavaActionExecutor{ return "distcp"; } + @Override + protected String getLauncherMain(Configuration launcherConf, Element actionXml) { + return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, CONF_OOZIE_DISTCP_ACTION_MAIN_CLASS); + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java new file mode 100644 index 0000000..d6ac554 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.XConfiguration; + +public class TestDistCpActionExecutor extends ActionExecutorTestCase{ + + @Override + protected void setSystemProps() throws Exception { + super.setSystemProps(); + setSystemProperty("oozie.service.ActionService.executor.classes", DistcpActionExecutor.class.getName()); + } + + @SuppressWarnings("unchecked") + public void testSetupMethods() throws Exception { + DistcpActionExecutor ae = new DistcpActionExecutor(); + assertEquals(Arrays.asList(DistcpMain.class), ae.getLauncherClasses()); + } + + public void testDistCpFile() throws Exception { + Path inputPath = new Path(getFsTestCaseDir(), "input.txt"); + final Path outputPath = new Path(getFsTestCaseDir(), "output.txt"); + byte[] content = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".getBytes(); + + OutputStream os = getFileSystem().create(inputPath); + os.write(content); + os.close(); + + String actionXml = "<distcp>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<arg>" + inputPath + "</arg>"+ + "<arg>" + outputPath + "</arg>" + + "</distcp>"; + Context context = createContext(actionXml); + final RunningJob runningJob = submitAction(context); + waitFor(60 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return runningJob.isComplete(); + } + }); + assertTrue(runningJob.isSuccessful()); + + waitFor(60 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return getFileSystem().exists(outputPath); + } + }); + assertTrue(getFileSystem().exists(outputPath)); + + byte[] readContent = new byte[content.length]; + InputStream is = getFileSystem().open(outputPath); + int offset = 0; + while (offset < readContent.length) + { + int numRead = is.read(readContent, offset, readContent.length); + if(numRead == -1) { + break; + } + offset += numRead; + } + assertEquals(is.read(), -1); + is.close(); + offset = 0; + while (offset < readContent.length) + { + assertEquals(readContent[offset], content[offset]); + offset++; + } + } + + + protected Context createContext(String actionXml) throws Exception { + DistcpActionExecutor ae = new DistcpActionExecutor(); + + Path appJarPath = new Path("lib/test.jar"); + File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", LauncherMainTester.class); + InputStream is = new FileInputStream(jarFile); + OutputStream os = getFileSystem().create(new Path(getAppPath(), "lib/test.jar")); + IOUtils.copyStream(is, os); + + Path appSoPath = new Path("lib/test.so"); + getFileSystem().create(new Path(getAppPath(), appSoPath)).close(); + + XConfiguration protoConf = new XConfiguration(); + protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); + protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, appJarPath.toString(), appSoPath.toString()); + + + WorkflowJobBean wf = createBaseWorkflow(protoConf, "action"); + WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); + action.setType(ae.getType()); + action.setConf(actionXml); + + return new Context(wf, action); + } + + protected RunningJob submitAction(Context context) throws Exception { + DistcpActionExecutor ae = new DistcpActionExecutor(); + + WorkflowAction action = context.getAction(); + + ae.prepareActionDir(getFileSystem(), context); + ae.submitLauncher(getFileSystem(), context, action); + + String jobId = action.getExternalId(); + String jobTracker = action.getTrackerUri(); + String consoleUrl = action.getConsoleUrl(); + assertNotNull(jobId); + assertNotNull(jobTracker); + assertNotNull(consoleUrl); + + JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); + jobConf.set("mapred.job.tracker", jobTracker); + + JobClient jobClient = + Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), jobConf); + final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); + assertNotNull(runningJob); + return runningJob; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/core/src/test/java/org/apache/oozie/action/hadoop/TestDistcpMain.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestDistcpMain.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestDistcpMain.java new file mode 100644 index 0000000..84351f1 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestDistcpMain.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.oozie.util.XConfiguration; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; + +import org.apache.oozie.action.hadoop.DistcpMain; + +public class TestDistcpMain extends MainTestCase { + + @Override + public Void call() throws Exception { + + XConfiguration jobConf = new XConfiguration(); + XConfiguration.copy(createJobConf(), jobConf); + + FileSystem fs = getFileSystem(); + Path inputDir = new Path(getFsTestCaseDir(), "input"); + fs.mkdirs(inputDir); + Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + writer.write("hello"); + writer.close(); + Path outputDir = new Path(getFsTestCaseDir(), "output"); + + jobConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, "org.apache.hadoop.tools.DistCp"); + + jobConf.set("mapreduce.job.tags", "" + System.currentTimeMillis()); + setSystemProperty("oozie.job.launch.time", "" + System.currentTimeMillis()); + + File actionXml = new File(getTestCaseDir(), "action.xml"); + OutputStream os = new FileOutputStream(actionXml); + jobConf.writeXml(os); + os.close(); + + System.setProperty("oozie.action.conf.xml", actionXml.getAbsolutePath()); + + // Check normal execution + DistcpMain.main(new String[]{inputDir.toString(), outputDir.toString()}); + assertTrue(getFileSystem().exists(outputDir)); + + // Check exception handling + try { + DistcpMain.main(new String[0]); + } catch(RuntimeException re) { + assertTrue(re.getMessage().indexOf("Returned value from distcp is non-zero") != -1); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 353174c..6fb9a8f 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,7 +1,7 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-1728 When an ApplicationMaster restarts, it restarts the launcher job: DistCp followup (ryota) OOZIE-2009 Requeue CoordActionInputCheck in case of permission error (ryota) -OOZIE-2005 Coordinator rerun fails to initialize error code and message (ryota) OOZIE-1896 ZKUUIDService - Too many job submission fails (puru) OOZIE-2019 SLA miss processed on server2 not send email (puru) OOZIE-1391 Sub wf suspend doesn't update parent wf (jaydeepvishwakarma via shwethags) @@ -35,6 +35,7 @@ OOZIE-1943 Bump up trunk to 4.2.0-SNAPSHOT (bzhang) -- Oozie 4.1.0 release (4.1 - unreleased) +OOZIE-2005 Coordinator rerun fails to initialize error code and message (ryota) OOZIE-2026 fix synchronization in SLACalculatorMemory.addJobStatus to avoid duplicated SLA message (ryota) OOZIE-2017 On startup, StatusTransitService can transition Coordinators that were in PREPSUSPENDED to RUNNING (rkanter) OOZIE-1932 Services should load CallableQueueService after MemoryLocksService (mona) http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/sharelib/distcp/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/distcp/pom.xml b/sharelib/distcp/pom.xml index 04e436d..b788ed0 100644 --- a/sharelib/distcp/pom.xml +++ b/sharelib/distcp/pom.xml @@ -45,17 +45,6 @@ </dependency> <dependency> <groupId>org.apache.oozie</groupId> - <artifactId>oozie-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.oozie</groupId> - <artifactId>oozie-core</artifactId> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.oozie</groupId> <artifactId>oozie-hadoop</artifactId> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/sharelib/distcp/src/main/java/org/apache/oozie/action/hadoop/DistcpMain.java ---------------------------------------------------------------------- diff --git a/sharelib/distcp/src/main/java/org/apache/oozie/action/hadoop/DistcpMain.java b/sharelib/distcp/src/main/java/org/apache/oozie/action/hadoop/DistcpMain.java new file mode 100644 index 0000000..67b445e --- /dev/null +++ b/sharelib/distcp/src/main/java/org/apache/oozie/action/hadoop/DistcpMain.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Tool; + +public class DistcpMain extends JavaMain { + + private Constructor<?> construct; + private Object[] constArgs; + + public static void main(String[] args) throws Exception { + run(DistcpMain.class, args); + } + + @Override + protected void run(String[] args) throws Exception { + + Configuration actionConf = loadActionConf(); + LauncherMainHadoopUtils.killChildYarnJobs(actionConf); + Class<?> klass = actionConf.getClass(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, + org.apache.hadoop.tools.DistCp.class); + System.out.println("Main class : " + klass.getName()); + System.out.println("Arguments :"); + for (String arg : args) { + System.out.println(" " + arg); + } + + // propagate delegation related props from launcher job to MR job + if (getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION") != null) { + actionConf.set("mapreduce.job.credentials.binary", getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION")); + } + + getConstructorAndArgs(klass, actionConf); + if (construct == null) { + throw new RuntimeException("Distcp constructor was not found, unable to instantiate"); + } + if (constArgs == null) { + throw new RuntimeException("Arguments for distcp constructor is null, unable to instantiate"); + } + try { + Tool distcp = (Tool) construct.newInstance(constArgs); + int i = distcp.run(args); + if (i != 0) { + throw new RuntimeException("Returned value from distcp is non-zero (" + i + ")"); + } + } + catch (InvocationTargetException ex) { + throw new JavaMainException(ex.getCause()); + } + } + + protected void getConstructorAndArgs(Class<?> klass, Configuration actionConf) throws Exception { + Constructor<?>[] allConstructors = klass.getConstructors(); + for (Constructor<?> cstruct : allConstructors) { + Class<?>[] pType = cstruct.getParameterTypes(); + construct = cstruct; + if (pType.length == 1 && pType[0].equals(Class.forName("org.apache.hadoop.conf.Configuration"))) { + System.out.println("found Distcp v1 Constructor"); + System.out.println(" " + cstruct.toString()); + constArgs = new Object[1]; + constArgs[0] = actionConf; + break; + } + else if (pType.length == 2 && pType[0].equals(Class.forName("org.apache.hadoop.conf.Configuration"))) { + // 2nd argument is org.apache.hadoop.tools.DistCpOptions + System.out.println("found Distcp v2 Constructor"); + System.out.println(" " + cstruct.toString()); + constArgs = new Object[2]; + constArgs[0] = actionConf; + constArgs[1] = null; + break; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/sharelib/distcp/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/distcp/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java b/sharelib/distcp/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java deleted file mode 100644 index 7a098f3..0000000 --- a/sharelib/distcp/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.action.hadoop; - -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.oozie.WorkflowActionBean; -import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.service.HadoopAccessorService; -import org.apache.oozie.service.Services; -import org.apache.oozie.service.WorkflowAppService; -import org.apache.oozie.util.IOUtils; -import org.apache.oozie.util.XConfiguration; - -public class TestDistCpActionExecutor extends ActionExecutorTestCase{ - - @Override - protected void setSystemProps() throws Exception { - super.setSystemProps(); - setSystemProperty("oozie.service.ActionService.executor.classes", DistcpActionExecutor.class.getName()); - } - - @SuppressWarnings("unchecked") - public void testSetupMethods() throws Exception { - DistcpActionExecutor ae = new DistcpActionExecutor(); - assertEquals(Arrays.asList(JavaMain.class), ae.getLauncherClasses()); - } - - public void testDistCpFile() throws Exception { - Path inputPath = new Path(getFsTestCaseDir(), "input.txt"); - final Path outputPath = new Path(getFsTestCaseDir(), "output.txt"); - byte[] content = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".getBytes(); - - OutputStream os = getFileSystem().create(inputPath); - os.write(content); - os.close(); - - String actionXml = "<distcp>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<arg>" + inputPath + "</arg>"+ - "<arg>" + outputPath + "</arg>" + - "</distcp>"; - Context context = createContext(actionXml); - final RunningJob runningJob = submitAction(context); - waitFor(60 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return runningJob.isComplete(); - } - }); - assertTrue(runningJob.isSuccessful()); - - waitFor(60 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return getFileSystem().exists(outputPath); - } - }); - assertTrue(getFileSystem().exists(outputPath)); - - byte[] readContent = new byte[content.length]; - InputStream is = getFileSystem().open(outputPath); - int offset = 0; - while (offset < readContent.length) - { - int numRead = is.read(readContent, offset, readContent.length); - if(numRead == -1) { - break; - } - offset += numRead; - } - assertEquals(is.read(), -1); - is.close(); - offset = 0; - while (offset < readContent.length) - { - assertEquals(readContent[offset], content[offset]); - offset++; - } - } - - - protected Context createContext(String actionXml) throws Exception { - DistcpActionExecutor ae = new DistcpActionExecutor(); - - Path appJarPath = new Path("lib/test.jar"); - File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", LauncherMainTester.class); - InputStream is = new FileInputStream(jarFile); - OutputStream os = getFileSystem().create(new Path(getAppPath(), "lib/test.jar")); - IOUtils.copyStream(is, os); - - Path appSoPath = new Path("lib/test.so"); - getFileSystem().create(new Path(getAppPath(), appSoPath)).close(); - - XConfiguration protoConf = new XConfiguration(); - protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); - protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, appJarPath.toString(), appSoPath.toString()); - - - WorkflowJobBean wf = createBaseWorkflow(protoConf, "action"); - WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); - action.setType(ae.getType()); - action.setConf(actionXml); - - return new Context(wf, action); - } - - - protected RunningJob submitAction(Context context) throws Exception { - DistcpActionExecutor ae = new DistcpActionExecutor(); - - WorkflowAction action = context.getAction(); - - ae.prepareActionDir(getFileSystem(), context); - ae.submitLauncher(getFileSystem(), context, action); - - String jobId = action.getExternalId(); - String jobTracker = action.getTrackerUri(); - String consoleUrl = action.getConsoleUrl(); - assertNotNull(jobId); - assertNotNull(jobTracker); - assertNotNull(consoleUrl); - - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - jobConf.set("mapred.job.tracker", jobTracker); - - JobClient jobClient = - Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; - } -}
