HIVE-16703 : Hive may add the same file to the session and vertex in Tez (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d951fa4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d951fa4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d951fa4 Branch: refs/heads/hive-14535 Commit: 8d951fa4e0665942c4c1cb44a7914f70b0604f2d Parents: 5d62dc8 Author: sergey <[email protected]> Authored: Fri May 19 17:24:27 2017 -0700 Committer: sergey <[email protected]> Committed: Fri May 19 17:24:27 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/tez/DagUtils.java | 29 ++++++++++++++++---- .../hive/ql/exec/tez/TezSessionState.java | 5 ++-- .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 14 +++++++--- .../hadoop/hive/ql/exec/tez/TestTezTask.java | 10 +++---- 4 files changed, 41 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index b0457be..b6e55c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -31,6 +31,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -847,12 +848,15 @@ public class DagUtils { String hdfsDirPathStr, Configuration conf) throws IOException, LoginException { List<LocalResource> tmpResources = new ArrayList<LocalResource>(); - addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, getTempFilesFromConf(conf)); - addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.ARCHIVE, getTempArchivesFromConf(conf)); + addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, + getTempFilesFromConf(conf), null); + addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.ARCHIVE, + getTempArchivesFromConf(conf), null); return tmpResources; } - private static String[] getTempFilesFromConf(Configuration conf) { + public static String[] getTempFilesFromConf(Configuration conf) { + if (conf == null) return new String[0]; // In tests. String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); if (StringUtils.isNotBlank(addedFiles)) { HiveConf.setVar(conf, ConfVars.HIVEADDEDFILES, addedFiles); @@ -888,21 +892,34 @@ public class DagUtils { * @throws LoginException when getDefaultDestDir fails with the same exception */ public List<LocalResource> localizeTempFiles(String hdfsDirPathStr, Configuration conf, - String[] inputOutputJars) throws IOException, LoginException { + String[] inputOutputJars, String[] skipJars) throws IOException, LoginException { if (inputOutputJars == null) return null; List<LocalResource> tmpResources = new ArrayList<LocalResource>(); - addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, inputOutputJars); + addTempResources(conf, tmpResources, hdfsDirPathStr, + LocalResourceType.FILE, inputOutputJars, skipJars); return tmpResources; } private void addTempResources(Configuration conf, List<LocalResource> tmpResources, String hdfsDirPathStr, LocalResourceType type, - String[] files) throws IOException { + String[] files, String[] skipFiles) throws IOException { + HashSet<Path> skipFileSet = null; + if (skipFiles != null) { + skipFileSet = new HashSet<>(); + for (String skipFile : skipFiles) { + if (StringUtils.isBlank(skipFile)) continue; + skipFileSet.add(new Path(skipFile)); + } + } for (String file : files) { if (!StringUtils.isNotBlank(file)) { continue; } + if (skipFileSet != null && skipFileSet.contains(new Path(file))) { + LOG.info("Skipping vertex resource " + file + " that already exists in the session"); + continue; + } Path hdfsFilePath = new Path(hdfsDirPathStr, getResourceBaseName(new Path(file))); LocalResource localResource = localizeResource(new Path(file), hdfsFilePath, type, conf); http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 036e918..fe5c6a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -476,9 +476,10 @@ public class TezSessionState { localizedResources.addAll(lrs); } - // these are local resources that are set through the mr "tmpjars" property + // these are local resources that are set through the mr "tmpjars" property; skip session files. List<LocalResource> handlerLr = utils.localizeTempFiles(dir, conf, - additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()])); + additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()]), + DagUtils.getTempFilesFromConf(conf)); if (handlerLr != null) { localizedResources.addAll(handlerLr); http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 1c84c6a..3356dc9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -123,6 +123,7 @@ public class TezTask extends Task<TezWork> { return counters; } + @Override public int execute(DriverContext driverContext) { int rc = 1; @@ -161,8 +162,12 @@ public class TezTask extends Task<TezWork> { // create the tez tmp dir scratchDir = utils.createTezDir(scratchDir, conf); + // This is used to compare global and vertex resources. Global resources are originally + // derived from session conf via localizeTempFilesFromConf. So, use that here. + Configuration sessionConf = + (session != null && session.getConf() != null) ? session.getConf() : conf; Map<String,LocalResource> inputOutputLocalResources = - getExtraLocalResources(jobConf, scratchDir, inputOutputJars); + getExtraLocalResources(jobConf, scratchDir, inputOutputJars, sessionConf); // Ensure the session is open and has the necessary local resources updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources); @@ -273,10 +278,11 @@ public class TezTask extends Task<TezWork> { * Converted the list of jars into local resources */ Map<String,LocalResource> getExtraLocalResources(JobConf jobConf, Path scratchDir, - String[] inputOutputJars) throws Exception { + String[] inputOutputJars, Configuration sessionConf) throws Exception { final Map<String,LocalResource> resources = new HashMap<String,LocalResource>(); - final List<LocalResource> localResources = utils.localizeTempFiles( - scratchDir.toString(), jobConf, inputOutputJars); + // Skip the files already in session local resources... + final List<LocalResource> localResources = utils.localizeTempFiles(scratchDir.toString(), + jobConf, inputOutputJars, DagUtils.getTempFilesFromConf(sessionConf)); if (null != localResources) { for (LocalResource lr : localResources) { resources.put(utils.getBaseName(lr), lr); http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 2b52056..70fedb7 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -245,7 +245,7 @@ public class TestTezTask { final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>(); resMap.put("foo.jar", res); - when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars)) + when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null)) .thenReturn(resources); when(utils.getBaseName(res)).thenReturn("foo.jar"); when(sessionState.isOpen()).thenReturn(true); @@ -264,7 +264,7 @@ public class TestTezTask { resMap.put("foo.jar", res); DAG dag = mock(DAG.class); - when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars)) + when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null)) .thenReturn(resources); when(utils.getBaseName(res)).thenReturn("foo.jar"); when(sessionState.isOpen()).thenReturn(true); @@ -282,11 +282,11 @@ public class TestTezTask { final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>(); resMap.put("foo.jar", res); - when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars)) - .thenReturn(resources); + when(utils.localizeTempFiles(eq(path.toString()), eq(conf), eq(inputOutputJars), + Mockito.<String[]>any())).thenReturn(resources); when(utils.getBaseName(res)).thenReturn("foo.jar"); - assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars)); + assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars, null)); } @Test
