Updated Branches: refs/heads/master 809bbd964 -> 714733f0a
TEZ-180. Job fails when map output compression is enabled. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/714733f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/714733f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/714733f0 Branch: refs/heads/master Commit: 714733f0a5c61b9f4d5e46f3e2ccc1b14dda3061 Parents: 809bbd9 Author: Hitesh Shah <[email protected]> Authored: Tue Jun 4 17:39:55 2013 -0700 Committer: Hitesh Shah <[email protected]> Committed: Tue Jun 4 17:39:55 2013 -0700 ---------------------------------------------------------------------- .../org/apache/tez/engine/common/ConfigUtils.java | 2 +- .../java/org/apache/tez/mapreduce/TestMRRJobs.java | 50 ++++++++++++++- 2 files changed, 49 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/714733f0/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java index f14bb3e..a92cf1b 100644 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java +++ b/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java @@ -74,7 +74,7 @@ public class ConfigUtils { public static boolean isIntermediateInputCompressed(Configuration conf) { return conf.getBoolean( - TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false); + TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED, false); } public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf) { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/714733f0/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java index 520a1ac..6fb22e6 100644 --- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java +++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobCounter; @@ -216,7 +217,7 @@ public class TestMRRJobs { public void testFailingJob() throws IOException, InterruptedException, ClassNotFoundException { - LOG.info("\n\n\nStarting testFailingMapper()."); + LOG.info("\n\n\nStarting testFailingJob()."); if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR @@ -252,7 +253,7 @@ public class TestMRRJobs { public void testFailingAttempt() throws IOException, InterruptedException, ClassNotFoundException { - LOG.info("\n\n\nStarting testFailingMapper()."); + LOG.info("\n\n\nStarting testFailingAttempt()."); if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR @@ -284,6 +285,51 @@ public class TestMRRJobs { // TODO verify failed task diagnostics } + @Test (timeout = 300000) + public void testMRRSleepJobWithCompression() throws IOException, + InterruptedException, ClassNotFoundException { + LOG.info("\n\n\nStarting testMRRSleepJobWithCompression()."); + + if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR + + " not found. Not running test."); + return; + } + + Configuration sleepConf = new Configuration(mrrTezCluster.getConfig()); + + MRRSleepJob sleepJob = new MRRSleepJob(); + sleepJob.setConf(sleepConf); + + Job job = sleepJob.createJob(1, 1, 2, 1, 1, + 1, 1, 1, 1, 1); + + job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. + job.addFileToClassPath(YARN_SITE_XML); + job.setJarByClass(MRRSleepJob.class); + job.setMaxMapAttempts(1); // speed up failures + + // enable compression + job.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); + job.getConfiguration().set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, + DefaultCodec.class.getName()); + + job.submit(); + String trackingUrl = job.getTrackingURL(); + String jobId = job.getJobID().toString(); + boolean succeeded = job.waitForCompletion(true); + Assert.assertTrue(succeeded); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + Assert.assertTrue("Tracking URL was " + trackingUrl + + " but didn't Match Job ID " + jobId , + trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/")); + + // FIXME once counters and task progress can be obtained properly + // TODO use dag client to test counters and task progress? + // what about completed jobs? + + } + /* //@Test (timeout = 60000)
