Author: omalley
Date: Fri Mar 4 03:35:41 2011
New Revision: 1077050
URL: http://svn.apache.org/viewvc?rev=1077050&view=rev
Log:
commit 5c98aaf6bed0247e8a6c6dd763e14f50eb1b173c
Author: Arun C Murthy <[email protected]>
Date: Mon Nov 9 16:24:12 2009 -0800
MAPREDUCE:1196 from
https://issues.apache.org/jira/secure/attachment/12424351/MAPREDUCE-1196_yhadoop20.patch
+++ b/YAHOO-CHANGES.txt
+yahoo-hadoop-0.20.1-3092118005:
+
+ MAPREDUCE-1196. Fix FileOutputCommitter to use the deprecated
cleanupJob
+ api correctly. (acmurthy)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/lib/output/
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/build.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
Modified: hadoop/common/branches/branch-0.20-security-patches/build.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/build.xml?rev=1077050&r1=1077049&r2=1077050&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/build.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/build.xml Fri Mar 4
03:35:41 2011
@@ -27,7 +27,7 @@
<property name="Name" value="Hadoop"/>
<property name="name" value="hadoop"/>
- <property name="version" value="0.20.1.3092118004"/>
+ <property name="version" value="0.20.1.3092118005"/>
<property name="final.name" value="${name}-${version}"/>
<property name="year" value="2009"/>
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1077050&r1=1077049&r2=1077050&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
Fri Mar 4 03:35:41 2011
@@ -77,13 +77,15 @@ public class FileOutputCommitter extends
@Override
public void commitJob(JobContext context) throws IOException {
- cleanup(context);
+ cleanupJob(context);
if (getOutputDirMarking(context.getJobConf())) {
markSuccessfulOutputDir(context);
}
}
- private void cleanup(JobContext context) throws IOException {
+ @Override
+ @Deprecated
+ public void cleanupJob(JobContext context) throws IOException {
JobConf conf = context.getJobConf();
// do the clean up of temporary directory
Path outputPath = FileOutputFormat.getOutputPath(conf);
@@ -107,7 +109,7 @@ public class FileOutputCommitter extends
*/
@Override
public void abortJob(JobContext context, int runState) throws IOException {
- cleanup(context);
+ cleanupJob(context);
}
public void setupTask(TaskAttemptContext context) throws IOException {
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=1077050&r1=1077049&r2=1077050&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/Job.java
Fri Mar 4 03:35:41 2011
@@ -265,6 +265,18 @@ public class Job extends JobContext {
}
/**
+ * Get the <i>progress</i> of the job's setup, as a float between 0.0
+ * and 1.0. When the job setup is completed, the function returns 1.0.
+ *
+ * @return the progress of the job's setup.
+ * @throws IOException
+ */
+ public float setupProgress() throws IOException {
+ ensureState(JobState.RUNNING);
+ return info.setupProgress();
+ }
+
+ /**
* Get the <i>progress</i> of the job's map-tasks, as a float between 0.0
* and 1.0. When all map tasks have completed, the function returns 1.0.
*
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=1077050&r1=1077049&r2=1077050&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
Fri Mar 4 03:35:41 2011
@@ -110,7 +110,7 @@ public class FileOutputCommitter extends
*/
public void commitJob(JobContext context) throws IOException {
// delete the _temporary folder
- cleanup(context);
+ cleanupJob(context);
// check if the o/p dir should be marked
if (shouldMarkOutputDir(context.getConfiguration())) {
// create a _success file in the o/p folder
@@ -118,8 +118,9 @@ public class FileOutputCommitter extends
}
}
- private void cleanup(JobContext context)
- throws IOException {
+ @Override
+ @Deprecated
+ public void cleanupJob(JobContext context) throws IOException {
if (outputPath != null) {
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
@@ -139,7 +140,7 @@ public class FileOutputCommitter extends
@Override
public void abortJob(JobContext context, JobStatus.State state)
throws IOException {
- cleanup(context);
+ cleanupJob(context);
}
/**
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java?rev=1077050&r1=1077049&r2=1077050&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCleanup.java
Fri Mar 4 03:35:41 2011
@@ -41,6 +41,8 @@ public class TestJobCleanup extends Test
private static String TEST_ROOT_DIR =
new File(System.getProperty("test.build.data", "/tmp") + "/"
+ "test-job-cleanup").toString();
+ private static final String CUSTOM_CLEANUP_FILE_NAME =
+ "_custom_cleanup";
private static final String ABORT_KILLED_FILE_NAME =
"_custom_abort_killed";
private static final String ABORT_FAILED_FILE_NAME =
@@ -86,6 +88,21 @@ public class TestJobCleanup extends Test
}
/**
+ * Committer with deprecated {@link
FileOutputCommitter#cleanupJob(JobContext)}
+ * making a _failed/_killed in the output folder
+ */
+ static class CommitterWithCustomDeprecatedCleanup extends
FileOutputCommitter {
+ @Override
+ public void cleanupJob(JobContext context) throws IOException {
+ System.err.println("---- HERE ----");
+ JobConf conf = context.getJobConf();
+ Path outputPath = FileOutputFormat.getOutputPath(conf);
+ FileSystem fs = outputPath.getFileSystem(conf);
+ fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close();
+ }
+ }
+
+ /**
* Committer with abort making a _failed/_killed in the output folder
*/
static class CommitterWithCustomAbort extends FileOutputCommitter {
@@ -264,4 +281,26 @@ public class TestJobCleanup extends Test
new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
ABORT_FAILED_FILE_NAME});
}
+
+ /**
+ * Test if a failed job with custom committer runs the deprecated
+ * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api
+ * compatibility testing.
+ */
+ public void testCustomCleanup() throws IOException {
+ // check with a successful job
+ testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME,
+ CommitterWithCustomDeprecatedCleanup.class,
+ new String[] {});
+
+ // check with a failed job
+ testFailedJob(CUSTOM_CLEANUP_FILE_NAME,
+ CommitterWithCustomDeprecatedCleanup.class,
+ new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+
+ // check with a killed job
+ testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME,
+ CommitterWithCustomDeprecatedCleanup.class,
+ new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+ }
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077050&r1=1077049&r2=1077050&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
Fri Mar 4 03:35:41 2011
@@ -222,7 +222,7 @@ public class UtilsForTests {
/**
* A utility that waits for specified amount of time
*/
- static void waitFor(long duration) {
+ public static void waitFor(long duration) {
try {
synchronized (waitLock) {
waitLock.wait(duration);
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java?rev=1077050&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
(added)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/lib/output/TestJobOutputCommitter.java
Fri Mar 4 03:35:41 2011
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.UtilsForTests;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A JUnit test to test Map-Reduce job committer.
+ */
+public class TestJobOutputCommitter extends HadoopTestCase {
+
+ public TestJobOutputCommitter() throws IOException {
+ super(CLUSTER_MR, LOCAL_FS, 1, 1);
+ }
+
+ private static String TEST_ROOT_DIR = new File(System.getProperty(
+ "test.build.data", "/tmp")
+ + "/" + "test-job-cleanup").toString();
+ private static final String CUSTOM_CLEANUP_FILE_NAME = "_custom_cleanup";
+ private static final String ABORT_KILLED_FILE_NAME = "_custom_abort_killed";
+ private static final String ABORT_FAILED_FILE_NAME = "_custom_abort_failed";
+ private static Path inDir = new Path(TEST_ROOT_DIR, "test-input");
+ private static int outDirs = 0;
+ private FileSystem fs;
+ private Configuration conf = null;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ conf = createJobConf();
+ conf.setBoolean(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
true);
+ fs = getFileSystem();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ fs.delete(new Path(TEST_ROOT_DIR), true);
+ super.tearDown();
+ }
+
+ /**
+ * Committer with deprecated {@link
FileOutputCommitter#cleanupJob(JobContext)}
+ * making a _failed/_killed in the output folder
+ */
+ static class CommitterWithCustomDeprecatedCleanup extends
FileOutputCommitter {
+ public CommitterWithCustomDeprecatedCleanup(Path outputPath,
+ TaskAttemptContext context) throws IOException {
+ super(outputPath, context);
+ }
+
+ @Override
+ public void cleanupJob(JobContext context) throws IOException {
+ System.err.println("---- HERE ----");
+ Path outputPath = FileOutputFormat.getOutputPath(context);
+ FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
+ fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close();
+ }
+ }
+
+ /**
+ * Committer with abort making a _failed/_killed in the output folder
+ */
+ static class CommitterWithCustomAbort extends FileOutputCommitter {
+ public CommitterWithCustomAbort(Path outputPath, TaskAttemptContext
context)
+ throws IOException {
+ super(outputPath, context);
+ }
+
+ @Override
+ public void abortJob(JobContext context, JobStatus.State state)
+ throws IOException {
+ Path outputPath = FileOutputFormat.getOutputPath(context);
+ FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
+ String fileName =
+ (state.equals(JobStatus.State.FAILED)) ? ABORT_FAILED_FILE_NAME
+ : ABORT_KILLED_FILE_NAME;
+ fs.create(new Path(outputPath, fileName)).close();
+ }
+ }
+
+ private Path getNewOutputDir() {
+ return new Path(TEST_ROOT_DIR, "output-" + outDirs++);
+ }
+
+ static class MyOutputFormatWithCustomAbort<K, V>
+ extends TextOutputFormat<K, V> {
+ private OutputCommitter committer = null;
+
+ public synchronized OutputCommitter getOutputCommitter(
+ TaskAttemptContext context) throws IOException {
+ if (committer == null) {
+ Path output = getOutputPath(context);
+ committer = new CommitterWithCustomAbort(output, context);
+ }
+ return committer;
+ }
+ }
+
+ static class MyOutputFormatWithCustomCleanup<K, V>
+ extends TextOutputFormat<K, V> {
+ private OutputCommitter committer = null;
+
+ public synchronized OutputCommitter getOutputCommitter(
+ TaskAttemptContext context) throws IOException {
+ if (committer == null) {
+ Path output = getOutputPath(context);
+ committer = new CommitterWithCustomDeprecatedCleanup(output, context);
+ }
+ return committer;
+ }
+ }
+
+ // run a job with 1 map and let it run to completion
+ private void testSuccessfulJob(String filename,
+ Class<? extends OutputFormat> output, String[] exclude) throws Exception
{
+ Path outDir = getNewOutputDir();
+ Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 0);
+ job.setOutputFormatClass(output);
+
+ assertTrue("Job failed!", job.waitForCompletion(true));
+
+ Path testFile = new Path(outDir, filename);
+ assertTrue("Done file missing for job ", fs.exists(testFile));
+
+ // check if the files from the missing set exists
+ for (String ex : exclude) {
+ Path file = new Path(outDir, ex);
+ assertFalse(
+ "File " + file + " should not be present for successful job ", fs
+ .exists(file));
+ }
+ }
+
+ // run a job for which all the attempts simply fail.
+ private void testFailedJob(String fileName,
+ Class<? extends OutputFormat> output, String[] exclude) throws Exception
{
+ Path outDir = getNewOutputDir();
+ Job job = MapReduceTestUtil.createFailJob(conf, outDir, inDir);
+ job.setOutputFormatClass(output);
+
+ assertFalse("Job did not fail!", job.waitForCompletion(true));
+
+ if (fileName != null) {
+ Path testFile = new Path(outDir, fileName);
+ assertTrue("File " + testFile + " missing for failed job ", fs
+ .exists(testFile));
+ }
+
+ // check if the files from the missing set exists
+ for (String ex : exclude) {
+ Path file = new Path(outDir, ex);
+ assertFalse("File " + file + " should not be present for failed job ", fs
+ .exists(file));
+ }
+ }
+
+ // run a job which gets stuck in mapper and kill it.
+ private void testKilledJob(String fileName,
+ Class<? extends OutputFormat> output, String[] exclude) throws Exception
{
+ Path outDir = getNewOutputDir();
+ Job job = MapReduceTestUtil.createKillJob(conf, outDir, inDir);
+ job.setOutputFormatClass(output);
+
+ job.submit();
+
+ // wait for the setup to be completed
+ while (job.setupProgress() != 1.0f) {
+ UtilsForTests.waitFor(100);
+ }
+
+ job.killJob(); // kill the job
+
+ assertFalse("Job did not get kill", job.waitForCompletion(true));
+
+ if (fileName != null) {
+ Path testFile = new Path(outDir, fileName);
+ assertTrue("File " + testFile + " missing for job ",
fs.exists(testFile));
+ }
+
+ // check if the files from the missing set exists
+ for (String ex : exclude) {
+ Path file = new Path(outDir, ex);
+ assertFalse("File " + file + " should not be present for killed job ", fs
+ .exists(file));
+ }
+ }
+
+ /**
+ * Test default cleanup/abort behavior
+ *
+ * @throws Exception
+ */
+ public void testDefaultCleanupAndAbort() throws Exception {
+ // check with a successful job
+ testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
+ TextOutputFormat.class, new String[] {});
+
+ // check with a failed job
+ testFailedJob(null, TextOutputFormat.class,
+ new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
+
+ // check default abort job kill
+ testKilledJob(null, TextOutputFormat.class,
+ new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
+ }
+
+ /**
+ * Test if a failed job with custom committer runs the abort code.
+ *
+ * @throws Exception
+ */
+ public void testCustomAbort() throws Exception {
+ // check with a successful job
+ testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
+ MyOutputFormatWithCustomAbort.class,
+ new String[] {ABORT_FAILED_FILE_NAME,
+ ABORT_KILLED_FILE_NAME});
+
+ // check with a failed job
+ testFailedJob(ABORT_FAILED_FILE_NAME,
+ MyOutputFormatWithCustomAbort.class,
+ new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
+ ABORT_KILLED_FILE_NAME});
+
+ // check with a killed job
+ testKilledJob(ABORT_KILLED_FILE_NAME,
+ MyOutputFormatWithCustomAbort.class,
+ new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
+ ABORT_FAILED_FILE_NAME});
+ }
+
+ /**
+ * Test if a failed job with custom committer runs the deprecated
+ * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api
+ * compatibility testing.
+ * @throws Exception
+ */
+ public void testCustomCleanup() throws Exception {
+
+ // check with a successful job
+ testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME,
+ MyOutputFormatWithCustomCleanup.class,
+ new String[] {});
+
+ // check with a failed job
+ testFailedJob(CUSTOM_CLEANUP_FILE_NAME,
+ MyOutputFormatWithCustomCleanup.class,
+ new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+
+ // check with a killed job
+ testKilledJob(CUSTOM_CLEANUP_FILE_NAME,
+ MyOutputFormatWithCustomCleanup.class,
+ new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+ }
+}