TEZ-2199. updateLocalResourcesForInputSplits assumes wrongly that split data is on same FS as the default FS. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/64df8291 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/64df8291 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/64df8291 Branch: refs/heads/TEZ-2003 Commit: 64df8291c022afaab99de0974b227d02dd3c3b33 Parents: 2960fd1 Author: Hitesh Shah <[email protected]> Authored: Tue Mar 17 11:03:29 2015 -0700 Committer: Hitesh Shah <[email protected]> Committed: Tue Mar 17 11:03:29 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/mapreduce/hadoop/MRInputHelpers.java | 12 +++-- .../mapreduce/hadoop/TestMRInputHelpers.java | 51 ++++++++++++++++---- 3 files changed, 50 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/64df8291/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bc37de0..e4d7641 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -244,6 +244,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2199. updateLocalResourcesForInputSplits assumes wrongly that split data is on same FS as the default FS. TEZ-2162. org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat is not recognized TEZ-2193. Check returned value from EdgeManagerPlugin before using it TEZ-2133. Secured Impersonation: Failed to delete tez scratch data dir http://git-wip-us.apache.org/repos/asf/tez/blob/64df8291/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java index ca02809..baf9a86 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java @@ -112,7 +112,7 @@ public class MRInputHelpers { .getName() : MRInput.class.getName()) .setUserPayload(createMRInputPayload(conf, null)); Map<String, LocalResource> additionalLocalResources = new HashMap<String, LocalResource>(); - updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo, + updateLocalResourcesForInputSplits(conf, inputSplitInfo, additionalLocalResources); DataSourceDescriptor dsd = DataSourceDescriptor.create(inputDescriptor, null, inputSplitInfo.getNumTasks(), @@ -618,13 +618,13 @@ public class MRInputHelpers { * Update provided localResources collection with the required local * resources needed by MapReduce tasks with respect to Input splits. * - * @param fs Filesystem instance to access status of splits related files + * @param conf Configuration * @param inputSplitInfo Information on location of split files * @param localResources LocalResources collection to be updated * @throws IOException */ private static void updateLocalResourcesForInputSplits( - FileSystem fs, + Configuration conf, InputSplitInfo inputSplitInfo, Map<String, LocalResource> localResources) throws IOException { if (localResources.containsKey(JOB_SPLIT_RESOURCE_NAME)) { @@ -636,10 +636,12 @@ public class MRInputHelpers { + " resource named " + JOB_SPLIT_METAINFO_RESOURCE_NAME); } + FileSystem splitsFS = inputSplitInfo.getSplitsFile().getFileSystem(conf); FileStatus splitFileStatus = - fs.getFileStatus(inputSplitInfo.getSplitsFile()); + splitsFS.getFileStatus(inputSplitInfo.getSplitsFile()); FileStatus metaInfoFileStatus = - fs.getFileStatus(inputSplitInfo.getSplitsMetaInfoFile()); + splitsFS.getFileStatus(inputSplitInfo.getSplitsMetaInfoFile()); + localResources.put(JOB_SPLIT_RESOURCE_NAME, LocalResource.newInstance( ConverterUtils.getYarnUrlFromPath(inputSplitInfo.getSplitsFile()), http://git-wip-us.apache.org/repos/asf/tez/blob/64df8291/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java index 58ad053..88cc4a5 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java @@ -18,8 +18,6 @@ package org.apache.tez.mapreduce.hadoop; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -28,6 +26,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -35,7 +34,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.split.JobSplit; import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; @@ -74,23 +74,26 @@ public class TestMRInputHelpers { Configuration testConf = new YarnConfiguration( dfsCluster.getFileSystem().getConf()); - File testConfFile = new File(TEST_ROOT_DIR, "test.xml"); + + FSDataOutputStream dataOutputStream = null; try { - testConfFile.createNewFile(); - testConf.writeXml(new FileOutputStream(testConfFile)); - testConfFile.deleteOnExit(); + dataOutputStream = remoteFs.create(new Path("/tmp/input/test.xml"), true); + testConf.writeXml(dataOutputStream); + dataOutputStream.hsync(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); throw new RuntimeException(e); + } finally { + if (dataOutputStream != null) { + dataOutputStream.close(); + } } remoteFs.mkdirs(new Path("/tmp/input/")); remoteFs.mkdirs(new Path("/tmp/splitsDirNew/")); remoteFs.mkdirs(new Path("/tmp/splitsDirOld/")); testFilePath = remoteFs.makeQualified(new Path("/tmp/input/test.xml")); - remoteFs.copyFromLocalFile(new Path(testConfFile.getAbsolutePath()), - testFilePath); FileStatus fsStatus = remoteFs.getFileStatus(testFilePath); Assert.assertTrue(fsStatus.getLen() > 0); @@ -226,4 +229,34 @@ public class TestMRInputHelpers { return MRInputHelpers.configureMRInputWithLegacySplitGeneration(jobConf, inputSplitsDir, true); } + + @Test(timeout = 5000) + public void testInputSplitLocalResourceCreationWithDifferentFS() throws Exception { + FileSystem localFs = FileSystem.getLocal(conf); + Path LOCAL_TEST_ROOT_DIR = new Path("target" + + Path.SEPARATOR + TestMRHelpers.class.getName() + "-localtmpDir"); + + try { + localFs.mkdirs(LOCAL_TEST_ROOT_DIR); + + Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR); + + DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(splitsDir); + + Map<String, LocalResource> localResources = dataSource.getAdditionalLocalFiles(); + + Assert.assertEquals(2, localResources.size()); + Assert.assertTrue(localResources.containsKey( + MRInputHelpers.JOB_SPLIT_RESOURCE_NAME)); + Assert.assertTrue(localResources.containsKey( + MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)); + + for (LocalResource lr : localResources.values()) { + Assert.assertFalse(lr.getResource().getScheme().contains(remoteFs.getScheme())); + } + } finally { + localFs.delete(LOCAL_TEST_ROOT_DIR, true); + } + } + }
