Repository: tez Updated Branches: refs/heads/master 18da49398 -> 6adfb5dad
TEZ-3276. Tez Example MRRSleep job fails when tez.staging-dir fs is not same as default FS. (Harish Jaiprakash via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6adfb5da Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6adfb5da Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6adfb5da Branch: refs/heads/master Commit: 6adfb5dad88994803cadd378453ec965e2a947f8 Parents: 18da493 Author: Hitesh Shah <[email protected]> Authored: Tue May 31 21:30:00 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Tue May 31 21:30:00 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/mapreduce/examples/MRRSleepJob.java | 30 +++++++++----------- 2 files changed, 15 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6adfb5da/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 81aedda..420ee58 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3276. Tez Example MRRSleep job fails when tez.staging-dir fs is not same as default FS. TEZ-3280. LOG MRInputHelpers split generation message as INFO TEZ-909. Provide support for application tags TEZ-3257. Fix flaky test TestUnorderedPartitionedKVWriter. http://git-wip-us.apache.org/repos/asf/tez/blob/6adfb5da/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java index 0a6d9a1..716910d 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java @@ -402,7 +402,7 @@ public class MRRSleepJob extends Configured implements Tool { private Credentials credentials = new Credentials(); - public DAG createDAG(FileSystem remoteFs, Configuration conf, Path remoteStagingDir, + public DAG createDAG(Configuration conf, Path stagingDir, int numMapper, int numReducer, int iReduceStagesCount, int numIReducer, long mapSleepTime, int mapSleepCount, long reduceSleepTime, int reduceSleepCount, @@ -488,7 +488,7 @@ public class MRRSleepJob extends Configured implements Tool { LOG.info("Writing splits to DFS"); dataSource = MRInputHelpers - .configureMRInputWithLegacySplitGeneration(mapStageConf, remoteStagingDir, true); + .configureMRInputWithLegacySplitGeneration(mapStageConf, stagingDir, true); } else { dataSource = MRInputLegacy.createConfigBuilder(mapStageConf, SleepInputFormat.class) .generateSplitsInAM(generateSplitsInAM).build(); @@ -500,11 +500,12 @@ public class MRRSleepJob extends Configured implements Tool { throw new TezUncheckedException("Could not find any jar containing" + " MRRSleepJob.class in the classpath"); } - Path remoteJarPath = remoteFs.makeQualified( - new Path(remoteStagingDir, "dag_job.jar")); - remoteFs.copyFromLocalFile(new Path(jarPath), remoteJarPath); - FileStatus jarFileStatus = remoteFs.getFileStatus(remoteJarPath); - + + FileSystem stagingFs = stagingDir.getFileSystem(conf); + Path remoteJarPath = new Path(stagingDir, "dag_job.jar"); + stagingFs.copyFromLocalFile(new Path(jarPath), remoteJarPath); + FileStatus jarFileStatus = stagingFs.getFileStatus(remoteJarPath); + TokenCache.obtainTokensForNamenodes(this.credentials, new Path[] { remoteJarPath }, mapStageConf); @@ -729,21 +730,18 @@ public class MRRSleepJob extends Configured implements Tool { iReduceSleepCount = (int)Math.ceil(iReduceSleepTime / ((double)recSleepTime)); TezConfiguration conf = new TezConfiguration(getConf()); - FileSystem remoteFs = FileSystem.get(conf); conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, conf.get( TezConfiguration.TEZ_AM_STAGING_DIR, TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT)); - - Path remoteStagingDir = - remoteFs.makeQualified(new Path(conf.get( - TezConfiguration.TEZ_AM_STAGING_DIR, - TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT), - Long.toString(System.currentTimeMillis()))); - TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir); - DAG dag = createDAG(remoteFs, conf, remoteStagingDir, + String stagingBaseDir = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR, + TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT); + Path stagingDir = new Path(stagingBaseDir, Long.toString(System.currentTimeMillis())); + TezClientUtils.ensureStagingDirExists(conf, stagingDir); + + DAG dag = createDAG(conf, stagingDir, numMapper, numReducer, iReduceStagesCount, numIReducer, mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount, iReduceSleepTime, iReduceSleepCount, writeSplitsToDfs, generateSplitsInAM);
