Repository: hive Updated Branches: refs/heads/master 188f7fb47 -> 26753ade2
HIVE-17574: Avoid multiple copies of HDFS-based jars when localizing job-jars (Chris Drome, reviewed by Mithun Radhakrishnan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/26753ade Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/26753ade Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/26753ade Branch: refs/heads/master Commit: 26753ade2a130339940119c950b9c9af53e3d024 Parents: 188f7fb Author: Mithun RK <mit...@apache.org> Authored: Thu Sep 21 16:20:32 2017 -0700 Committer: Mithun RK <mit...@apache.org> Committed: Thu Oct 5 14:18:45 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../apache/hadoop/hive/ql/exec/Utilities.java | 20 ++++- .../hadoop/hive/ql/exec/tez/DagUtils.java | 40 +++++++++- .../hadoop/hive/ql/session/SessionState.java | 79 ++++++++++++++++++-- .../hadoop/hive/ql/util/ResourceDownloader.java | 11 ++- .../hadoop/hive/ql/session/TestAddResource.java | 18 ++--- 6 files changed, 150 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/26753ade/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index da30b37..c95844b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1029,6 +1029,8 @@ public class HiveConf extends Configuration { HIVEADDEDFILES("hive.added.files.path", "", "This an internal parameter."), HIVEADDEDJARS("hive.added.jars.path", "", "This an internal parameter."), HIVEADDEDARCHIVES("hive.added.archives.path", "", "This an internal parameter."), + HIVEADDFILESUSEHDFSLOCATION("hive.resource.use.hdfs.location", true, "Reference HDFS based files/jars directly instead of " + + "copy to session based HDFS scratch directory, to make distributed cache more useful."), HIVE_CURRENT_DATABASE("hive.current.database", "", "Database name used by current session. Internal usage only.", true), http://git-wip-us.apache.org/repos/asf/hive/blob/26753ade/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index ae63727..8311037 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1763,9 +1763,27 @@ public final class Utilities { } public static String getResourceFiles(Configuration conf, SessionState.ResourceType t) { - // fill in local files to be added to the task environment + // fill in local files (includes copy of HDFS files) to be added to the task environment SessionState ss = SessionState.get(); Set<String> files = (ss == null) ? null : ss.list_resource(t, null); + return validateFiles(conf, files); + } + + public static String getHdfsResourceFiles(Configuration conf, SessionState.ResourceType type) { + // fill in HDFS files to be added to the task environment + SessionState ss = SessionState.get(); + Set<String> files = (ss == null) ? null : ss.list_hdfs_resource(type); + return validateFiles(conf, files); + } + + public static String getLocalResourceFiles(Configuration conf, SessionState.ResourceType type) { + // fill in local only files (excludes copy of HDFS files) to be added to the task environment + SessionState ss = SessionState.get(); + Set<String> files = (ss == null) ? null : ss.list_local_resource(type); + return validateFiles(conf, files); + } + + private static String validateFiles(Configuration conf, Set<String> files){ if (files != null) { List<String> realFiles = new ArrayList<String>(files.size()); for (String one : files) { http://git-wip-us.apache.org/repos/asf/hive/blob/26753ade/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index e7f2400..aae3480 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -870,13 +870,49 @@ public class DagUtils { String hdfsDirPathStr, Configuration conf) throws IOException, LoginException { List<LocalResource> tmpResources = new ArrayList<LocalResource>(); - addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, - getTempFilesFromConf(conf), null); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEADDFILESUSEHDFSLOCATION)) { + // reference HDFS based resource directly, to use distribute cache efficiently. + addHdfsResource(conf, tmpResources, LocalResourceType.FILE, getHdfsTempFilesFromConf(conf)); + // local resources are session based. + addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, getLocalTempFilesFromConf(conf), null); + } else { + // all resources including HDFS are session based. + addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, getTempFilesFromConf(conf), null); + } + addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.ARCHIVE, getTempArchivesFromConf(conf), null); return tmpResources; } + private void addHdfsResource(Configuration conf, List<LocalResource> tmpResources, + LocalResourceType type, String[] files) throws IOException { + for (String file: files) { + if (StringUtils.isNotBlank(file)) { + Path dest = new Path(file); + FileSystem destFS = dest.getFileSystem(conf); + LocalResource localResource = createLocalResource(destFS, dest, type, + LocalResourceVisibility.PRIVATE); + tmpResources.add(localResource); + } + } + } + + private static String[] getHdfsTempFilesFromConf(Configuration conf) { + String addedFiles = Utilities.getHdfsResourceFiles(conf, SessionState.ResourceType.FILE); + String addedJars = Utilities.getHdfsResourceFiles(conf, SessionState.ResourceType.JAR); + String allFiles = addedJars + "," + addedFiles; + return allFiles.split(","); + } + + private static String[] getLocalTempFilesFromConf(Configuration conf) { + String addedFiles = Utilities.getLocalResourceFiles(conf, SessionState.ResourceType.FILE); + String addedJars = Utilities.getLocalResourceFiles(conf, SessionState.ResourceType.JAR); + String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS); + String allFiles = auxJars + "," + addedJars + "," + addedFiles; + return allFiles.split(","); + } + public static String[] getTempFilesFromConf(Configuration conf) { if (conf == null) return new String[0]; // In tests. String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); http://git-wip-us.apache.org/repos/asf/hive/blob/26753ade/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 6dece05..4820fed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -1448,7 +1448,7 @@ public class SessionState { String key; //get the local path of downloaded jars. - List<URI> downloadedURLs = resolveAndDownload(value, convertToUnix); + List<URI> downloadedURLs = resolveAndDownload(t, value, convertToUnix); if (ResourceDownloader.isIvyUri(value)) { // get the key to store in map @@ -1494,9 +1494,14 @@ public class SessionState { } @VisibleForTesting - protected List<URI> resolveAndDownload(String value, boolean convertToUnix) + protected List<URI> resolveAndDownload(ResourceType resourceType, String value, boolean convertToUnix) throws URISyntaxException, IOException { - return resourceDownloader.resolveAndDownload(value, convertToUnix); + List<URI> uris = resourceDownloader.resolveAndDownload(value, convertToUnix); + if (ResourceDownloader.isHdfsUri(value)) { + assert uris.size() == 1 : "There should only be one URI localized-resource."; + resourceMaps.getLocalHdfsLocationMap(resourceType).put(uris.get(0).toString(), value); + } + return uris; } public void delete_resources(ResourceType t, List<String> values) { @@ -1514,12 +1519,18 @@ public class SessionState { if (ResourceDownloader.isIvyUri(value)) { key = ResourceDownloader.createURI(value).getAuthority(); } + else if (ResourceDownloader.isHdfsUri(value)) { + String removedKey = removeHdfsFilesFromMapping(t, value); + // remove local copy of HDFS location from resource map. + if (removedKey != null) { + key = removedKey; + } + } } catch (URISyntaxException e) { throw new RuntimeException("Invalid uri string " + value + ", " + e.getMessage()); } // get all the dependencies to delete - Set<String> resourcePaths = resourcePathMap.get(key); if (resourcePaths == null) { return; @@ -1539,7 +1550,6 @@ public class SessionState { resources.removeAll(deleteList); } - public Set<String> list_resource(ResourceType t, List<String> filter) { Set<String> orig = resourceMaps.getResourceSet(t); if (orig == null) { @@ -1558,11 +1568,53 @@ public class SessionState { } } + private String removeHdfsFilesFromMapping(ResourceType t, String file){ + String key = null; + if (resourceMaps.getLocalHdfsLocationMap(t).containsValue(file)){ + for (Map.Entry<String, String> entry : resourceMaps.getLocalHdfsLocationMap(t).entrySet()){ + if (entry.getValue().equals(file)){ + key = entry.getKey(); + resourceMaps.getLocalHdfsLocationMap(t).remove(key); + } + } + } + return key; + } + + public Set<String> list_local_resource(ResourceType type) { + Set<String> resources = new HashSet<String>(list_resource(type, null)); + Set<String> set = resourceMaps.getResourceSet(type); + for (String file : set){ + if (resourceMaps.getLocalHdfsLocationMap(ResourceType.FILE).containsKey(file)){ + resources.remove(file); + } + if (resourceMaps.getLocalHdfsLocationMap(ResourceType.JAR).containsKey(file)){ + resources.remove(file); + } + } + return resources; + } + + public Set<String> list_hdfs_resource(ResourceType type) { + Set<String> set = resourceMaps.getResourceSet(type); + Set<String> result = new HashSet<String>(); + for (String file : set){ + if (resourceMaps.getLocalHdfsLocationMap(ResourceType.FILE).containsKey(file)){ + result.add(resourceMaps.getLocalHdfsLocationMap(ResourceType.FILE).get(file)); + } + if (resourceMaps.getLocalHdfsLocationMap(ResourceType.JAR).containsKey(file)){ + result.add(resourceMaps.getLocalHdfsLocationMap(ResourceType.JAR).get(file)); + } + } + return result; + } + public void delete_resources(ResourceType t) { Set<String> resources = resourceMaps.getResourceSet(t); if (resources != null && !resources.isEmpty()) { delete_resources(t, new ArrayList<String>(resources)); resourceMaps.getResourceMap().remove(t); + resourceMaps.getAllLocalHdfsLocationMap().remove(t); } } @@ -1951,18 +2003,24 @@ class ResourceMaps { private final Map<SessionState.ResourceType, Map<String, Set<String>>> resource_path_map; // stores all the downloaded resources as key and the jars which depend on these resources as values in form of a list. Used for deleting transitive dependencies. private final Map<SessionState.ResourceType, Map<String, Set<String>>> reverse_resource_path_map; + // stores mappings from local to hdfs location for all resource types. + private final Map<SessionState.ResourceType, Map<String, String>> local_hdfs_resource_map; public ResourceMaps() { resource_map = new HashMap<SessionState.ResourceType, Set<String>>(); resource_path_map = new HashMap<SessionState.ResourceType, Map<String, Set<String>>>(); reverse_resource_path_map = new HashMap<SessionState.ResourceType, Map<String, Set<String>>>(); - + local_hdfs_resource_map = new HashMap<SessionState.ResourceType, Map<String, String>>(); } public Map<SessionState.ResourceType, Set<String>> getResourceMap() { return resource_map; } + public Map<SessionState.ResourceType, Map<String, String>> getAllLocalHdfsLocationMap() { + return local_hdfs_resource_map; + } + public Set<String> getResourceSet(SessionState.ResourceType t) { Set<String> result = resource_map.get(t); if (result == null) { @@ -1990,4 +2048,13 @@ class ResourceMaps { return result; } + public Map<String, String> getLocalHdfsLocationMap(SessionState.ResourceType type){ + Map<String, String> result = local_hdfs_resource_map.get(type); + if (result == null) { + result = new HashMap<String, String>(); + local_hdfs_resource_map.put(type, result); + } + return result; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/26753ade/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java b/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java index faf86fb..42ed302 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java @@ -59,6 +59,10 @@ public class ResourceDownloader { return "ivy".equalsIgnoreCase(createURI(value).getScheme()); } + public static boolean isHdfsUri(String value) throws URISyntaxException { + return "hdfs".equalsIgnoreCase(createURI(value).getScheme()); + } + public static boolean isFileUri(String value) { String scheme = null; try { @@ -84,7 +88,9 @@ public class ResourceDownloader { switch (getURLType(source)) { case FILE: return isLocalAllowed ? Lists.newArrayList(source) : null; case IVY: return dependencyResolver.downloadDependencies(source); - case OTHER: return Lists.newArrayList( + case HDFS: + case OTHER: + return Lists.newArrayList( createURI(downloadResource(source, subDir, convertToUnix))); default: throw new AssertionError(getURLType(source)); } @@ -117,13 +123,14 @@ public class ResourceDownloader { } } - private enum UriType { IVY, FILE, OTHER }; + private enum UriType { IVY, FILE, HDFS, OTHER }; private static ResourceDownloader.UriType getURLType(URI value) throws URISyntaxException { String scheme = value.getScheme(); if (scheme == null) return UriType.FILE; scheme = scheme.toLowerCase(); if ("ivy".equals(scheme)) return UriType.IVY; if ("file".equals(scheme)) return UriType.FILE; + if ("hdfs".equals(scheme)) return UriType.HDFS; return UriType.OTHER; } } http://git-wip-us.apache.org/repos/asf/hive/blob/26753ade/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java b/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java index dafbe16..4b74972 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java @@ -83,7 +83,7 @@ public class TestAddResource { list.add(createURI(TEST_JAR_DIR + "testjar5.jar")); //return all the dependency urls - Mockito.when(ss.resolveAndDownload(query, false)).thenReturn(list); + Mockito.when(ss.resolveAndDownload(t, query, false)).thenReturn(list); addList.add(query); ss.add_resources(t, addList); Set<String> dependencies = ss.list_resource(t, null); @@ -119,7 +119,7 @@ public class TestAddResource { Collections.sort(list); - Mockito.when(ss.resolveAndDownload(query, false)).thenReturn(list); + Mockito.when(ss.resolveAndDownload(t, query, false)).thenReturn(list); for (int i = 0; i < 10; i++) { addList.add(query); } @@ -157,8 +157,8 @@ public class TestAddResource { list2.add(createURI(TEST_JAR_DIR + "testjar3.jar")); list2.add(createURI(TEST_JAR_DIR + "testjar4.jar")); - Mockito.when(ss.resolveAndDownload(query1, false)).thenReturn(list1); - Mockito.when(ss.resolveAndDownload(query2, false)).thenReturn(list2); + Mockito.when(ss.resolveAndDownload(t, query1, false)).thenReturn(list1); + Mockito.when(ss.resolveAndDownload(t, query2, false)).thenReturn(list2); addList.add(query1); addList.add(query2); ss.add_resources(t, addList); @@ -208,8 +208,8 @@ public class TestAddResource { Collections.sort(list1); Collections.sort(list2); - Mockito.when(ss.resolveAndDownload(query1, false)).thenReturn(list1); - Mockito.when(ss.resolveAndDownload(query2, false)).thenReturn(list2); + Mockito.when(ss.resolveAndDownload(t, query1, false)).thenReturn(list1); + Mockito.when(ss.resolveAndDownload(t, query2, false)).thenReturn(list2); addList.add(query1); addList.add(query2); ss.add_resources(t, addList); @@ -267,9 +267,9 @@ public class TestAddResource { Collections.sort(list2); Collections.sort(list3); - Mockito.when(ss.resolveAndDownload(query1, false)).thenReturn(list1); - Mockito.when(ss.resolveAndDownload(query2, false)).thenReturn(list2); - Mockito.when(ss.resolveAndDownload(query3, false)).thenReturn(list3); + Mockito.when(ss.resolveAndDownload(t, query1, false)).thenReturn(list1); + Mockito.when(ss.resolveAndDownload(t, query2, false)).thenReturn(list2); + Mockito.when(ss.resolveAndDownload(t, query3, false)).thenReturn(list3); addList.add(query1); addList.add(query2); addList.add(query3);