Repository: hive
Updated Branches:
  refs/heads/master 188f7fb47 -> 26753ade2


HIVE-17574: Avoid multiple copies of HDFS-based jars when localizing job-jars 
(Chris Drome, reviewed by Mithun Radhakrishnan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/26753ade
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/26753ade
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/26753ade

Branch: refs/heads/master
Commit: 26753ade2a130339940119c950b9c9af53e3d024
Parents: 188f7fb
Author: Mithun RK <mit...@apache.org>
Authored: Thu Sep 21 16:20:32 2017 -0700
Committer: Mithun RK <mit...@apache.org>
Committed: Thu Oct 5 14:18:45 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  2 +
 .../apache/hadoop/hive/ql/exec/Utilities.java   | 20 ++++-
 .../hadoop/hive/ql/exec/tez/DagUtils.java       | 40 +++++++++-
 .../hadoop/hive/ql/session/SessionState.java    | 79 ++++++++++++++++++--
 .../hadoop/hive/ql/util/ResourceDownloader.java | 11 ++-
 .../hadoop/hive/ql/session/TestAddResource.java | 18 ++---
 6 files changed, 150 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/26753ade/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index da30b37..c95844b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1029,6 +1029,8 @@ public class HiveConf extends Configuration {
     HIVEADDEDFILES("hive.added.files.path", "", "This an internal parameter."),
     HIVEADDEDJARS("hive.added.jars.path", "", "This an internal parameter."),
     HIVEADDEDARCHIVES("hive.added.archives.path", "", "This an internal 
parameter."),
+    HIVEADDFILESUSEHDFSLOCATION("hive.resource.use.hdfs.location", true, 
"Reference HDFS based files/jars directly instead of "
+        + "copy to session based HDFS scratch directory, to make distributed 
cache more useful."),
 
     HIVE_CURRENT_DATABASE("hive.current.database", "", "Database name used by 
current session. Internal usage only.", true),
 

http://git-wip-us.apache.org/repos/asf/hive/blob/26753ade/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index ae63727..8311037 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1763,9 +1763,27 @@ public final class Utilities {
   }
 
   public static String getResourceFiles(Configuration conf, 
SessionState.ResourceType t) {
-    // fill in local files to be added to the task environment
+    // fill in local files (includes copy of HDFS files) to be added to the 
task environment
     SessionState ss = SessionState.get();
     Set<String> files = (ss == null) ? null : ss.list_resource(t, null);
+    return validateFiles(conf, files);
+  }
+
+  public static String getHdfsResourceFiles(Configuration conf, 
SessionState.ResourceType type) {
+    // fill in HDFS files to be added to the task environment
+    SessionState ss = SessionState.get();
+    Set<String> files = (ss == null) ? null : ss.list_hdfs_resource(type);
+    return validateFiles(conf, files);
+  }
+
+  public static String getLocalResourceFiles(Configuration conf, 
SessionState.ResourceType type) {
+    // fill in local only files (excludes copy of HDFS files) to be added to 
the task environment
+    SessionState ss = SessionState.get();
+    Set<String> files = (ss == null) ? null : ss.list_local_resource(type);
+    return validateFiles(conf, files);
+  }
+
+  private static String validateFiles(Configuration conf, Set<String> files){
     if (files != null) {
       List<String> realFiles = new ArrayList<String>(files.size());
       for (String one : files) {

http://git-wip-us.apache.org/repos/asf/hive/blob/26753ade/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index e7f2400..aae3480 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -870,13 +870,49 @@ public class DagUtils {
       String hdfsDirPathStr, Configuration conf) throws IOException, 
LoginException {
     List<LocalResource> tmpResources = new ArrayList<LocalResource>();
 
-    addTempResources(conf, tmpResources, hdfsDirPathStr, 
LocalResourceType.FILE,
-        getTempFilesFromConf(conf), null);
+    if (HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVEADDFILESUSEHDFSLOCATION)) {
+      // reference HDFS based resource directly, to use distribute cache 
efficiently.
+      addHdfsResource(conf, tmpResources, LocalResourceType.FILE, 
getHdfsTempFilesFromConf(conf));
+      // local resources are session based.
+      addTempResources(conf, tmpResources, hdfsDirPathStr, 
LocalResourceType.FILE, getLocalTempFilesFromConf(conf), null);
+    } else {
+      // all resources including HDFS are session based.
+      addTempResources(conf, tmpResources, hdfsDirPathStr, 
LocalResourceType.FILE, getTempFilesFromConf(conf), null);
+    }
+
     addTempResources(conf, tmpResources, hdfsDirPathStr, 
LocalResourceType.ARCHIVE,
         getTempArchivesFromConf(conf), null);
     return tmpResources;
   }
 
+  private void addHdfsResource(Configuration conf, List<LocalResource> 
tmpResources,
+                               LocalResourceType type, String[] files) throws 
IOException {
+    for (String file: files) {
+      if (StringUtils.isNotBlank(file)) {
+        Path dest = new Path(file);
+        FileSystem destFS = dest.getFileSystem(conf);
+        LocalResource localResource = createLocalResource(destFS, dest, type,
+            LocalResourceVisibility.PRIVATE);
+        tmpResources.add(localResource);
+      }
+    }
+  }
+
+  private static String[] getHdfsTempFilesFromConf(Configuration conf) {
+    String addedFiles = Utilities.getHdfsResourceFiles(conf, 
SessionState.ResourceType.FILE);
+    String addedJars = Utilities.getHdfsResourceFiles(conf, 
SessionState.ResourceType.JAR);
+    String allFiles = addedJars + "," + addedFiles;
+    return allFiles.split(",");
+  }
+
+  private static String[] getLocalTempFilesFromConf(Configuration conf) {
+    String addedFiles = Utilities.getLocalResourceFiles(conf, 
SessionState.ResourceType.FILE);
+    String addedJars = Utilities.getLocalResourceFiles(conf, 
SessionState.ResourceType.JAR);
+    String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
+    String allFiles = auxJars + "," + addedJars + "," + addedFiles;
+    return allFiles.split(",");
+  }
+
   public static String[] getTempFilesFromConf(Configuration conf) {
     if (conf == null) return new String[0]; // In tests.
     String addedFiles = Utilities.getResourceFiles(conf, 
SessionState.ResourceType.FILE);

http://git-wip-us.apache.org/repos/asf/hive/blob/26753ade/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 6dece05..4820fed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -1448,7 +1448,7 @@ public class SessionState {
         String key;
 
         //get the local path of downloaded jars.
-        List<URI> downloadedURLs = resolveAndDownload(value, convertToUnix);
+        List<URI> downloadedURLs = resolveAndDownload(t, value, convertToUnix);
 
         if (ResourceDownloader.isIvyUri(value)) {
           // get the key to store in map
@@ -1494,9 +1494,14 @@ public class SessionState {
   }
 
   @VisibleForTesting
-  protected List<URI> resolveAndDownload(String value, boolean convertToUnix)
+  protected List<URI> resolveAndDownload(ResourceType resourceType, String 
value, boolean convertToUnix)
       throws URISyntaxException, IOException {
-    return resourceDownloader.resolveAndDownload(value, convertToUnix);
+    List<URI> uris = resourceDownloader.resolveAndDownload(value, 
convertToUnix);
+    if (ResourceDownloader.isHdfsUri(value)) {
+      assert uris.size() == 1 : "There should only be one URI 
localized-resource.";
+      
resourceMaps.getLocalHdfsLocationMap(resourceType).put(uris.get(0).toString(), 
value);
+    }
+    return uris;
   }
 
   public void delete_resources(ResourceType t, List<String> values) {
@@ -1514,12 +1519,18 @@ public class SessionState {
         if (ResourceDownloader.isIvyUri(value)) {
           key = ResourceDownloader.createURI(value).getAuthority();
         }
+        else if (ResourceDownloader.isHdfsUri(value)) {
+          String removedKey = removeHdfsFilesFromMapping(t, value);
+          // remove local copy of HDFS location from resource map.
+          if (removedKey != null) {
+            key = removedKey;
+          }
+        }
       } catch (URISyntaxException e) {
         throw new RuntimeException("Invalid uri string " + value + ", " + 
e.getMessage());
       }
 
       // get all the dependencies to delete
-
       Set<String> resourcePaths = resourcePathMap.get(key);
       if (resourcePaths == null) {
         return;
@@ -1539,7 +1550,6 @@ public class SessionState {
     resources.removeAll(deleteList);
   }
 
-
   public Set<String> list_resource(ResourceType t, List<String> filter) {
     Set<String> orig = resourceMaps.getResourceSet(t);
     if (orig == null) {
@@ -1558,11 +1568,53 @@ public class SessionState {
     }
   }
 
+  private String removeHdfsFilesFromMapping(ResourceType t, String file){
+    String key = null;
+    if (resourceMaps.getLocalHdfsLocationMap(t).containsValue(file)){
+      for (Map.Entry<String, String> entry : 
resourceMaps.getLocalHdfsLocationMap(t).entrySet()){
+        if (entry.getValue().equals(file)){
+          key = entry.getKey();
+          resourceMaps.getLocalHdfsLocationMap(t).remove(key);
+        }
+      }
+    }
+    return key;
+  }
+
+  public Set<String> list_local_resource(ResourceType type) {
+    Set<String> resources = new HashSet<String>(list_resource(type, null));
+    Set<String> set = resourceMaps.getResourceSet(type);
+    for (String file : set){
+      if 
(resourceMaps.getLocalHdfsLocationMap(ResourceType.FILE).containsKey(file)){
+        resources.remove(file);
+      }
+      if 
(resourceMaps.getLocalHdfsLocationMap(ResourceType.JAR).containsKey(file)){
+        resources.remove(file);
+      }
+    }
+    return resources;
+  }
+
+  public Set<String> list_hdfs_resource(ResourceType type) {
+    Set<String> set = resourceMaps.getResourceSet(type);
+    Set<String> result = new HashSet<String>();
+    for (String file : set){
+      if 
(resourceMaps.getLocalHdfsLocationMap(ResourceType.FILE).containsKey(file)){
+        
result.add(resourceMaps.getLocalHdfsLocationMap(ResourceType.FILE).get(file));
+      }
+      if 
(resourceMaps.getLocalHdfsLocationMap(ResourceType.JAR).containsKey(file)){
+        
result.add(resourceMaps.getLocalHdfsLocationMap(ResourceType.JAR).get(file));
+      }
+    }
+    return result;
+  }
+
   public void delete_resources(ResourceType t) {
     Set<String> resources = resourceMaps.getResourceSet(t);
     if (resources != null && !resources.isEmpty()) {
       delete_resources(t, new ArrayList<String>(resources));
       resourceMaps.getResourceMap().remove(t);
+      resourceMaps.getAllLocalHdfsLocationMap().remove(t);
     }
   }
 
@@ -1951,18 +2003,24 @@ class ResourceMaps {
   private final Map<SessionState.ResourceType, Map<String, Set<String>>> 
resource_path_map;
   // stores all the downloaded resources as key and the jars which depend on 
these resources as values in form of a list. Used for deleting transitive 
dependencies.
   private final Map<SessionState.ResourceType, Map<String, Set<String>>> 
reverse_resource_path_map;
+  // stores mappings from local to hdfs location for all resource types.
+  private final Map<SessionState.ResourceType, Map<String, String>> 
local_hdfs_resource_map;
 
   public ResourceMaps() {
     resource_map = new HashMap<SessionState.ResourceType, Set<String>>();
     resource_path_map = new HashMap<SessionState.ResourceType, Map<String, 
Set<String>>>();
     reverse_resource_path_map = new HashMap<SessionState.ResourceType, 
Map<String, Set<String>>>();
-
+    local_hdfs_resource_map = new HashMap<SessionState.ResourceType, 
Map<String, String>>();
   }
 
   public Map<SessionState.ResourceType, Set<String>> getResourceMap() {
     return resource_map;
   }
 
+  public Map<SessionState.ResourceType, Map<String, String>> 
getAllLocalHdfsLocationMap() {
+    return local_hdfs_resource_map;
+  }
+
   public Set<String> getResourceSet(SessionState.ResourceType t) {
     Set<String> result = resource_map.get(t);
     if (result == null) {
@@ -1990,4 +2048,13 @@ class ResourceMaps {
     return result;
   }
 
+  public Map<String, String> getLocalHdfsLocationMap(SessionState.ResourceType 
type){
+    Map<String, String> result = local_hdfs_resource_map.get(type);
+    if (result == null) {
+      result = new HashMap<String, String>();
+      local_hdfs_resource_map.put(type, result);
+    }
+    return result;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/26753ade/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java
index faf86fb..42ed302 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java
@@ -59,6 +59,10 @@ public class ResourceDownloader {
     return "ivy".equalsIgnoreCase(createURI(value).getScheme());
   }
 
+  public static boolean isHdfsUri(String value) throws URISyntaxException {
+    return "hdfs".equalsIgnoreCase(createURI(value).getScheme());
+  }
+
   public static boolean isFileUri(String value) {
     String scheme = null;
     try {
@@ -84,7 +88,9 @@ public class ResourceDownloader {
     switch (getURLType(source)) {
     case FILE: return isLocalAllowed ? Lists.newArrayList(source) : null;
     case IVY: return dependencyResolver.downloadDependencies(source);
-    case OTHER: return Lists.newArrayList(
+    case HDFS:
+    case OTHER:
+      return Lists.newArrayList(
         createURI(downloadResource(source, subDir, convertToUnix)));
     default: throw new AssertionError(getURLType(source));
     }
@@ -117,13 +123,14 @@ public class ResourceDownloader {
     }
   }
 
-  private enum UriType { IVY, FILE, OTHER };
+  private enum UriType { IVY, FILE, HDFS, OTHER };
   private static ResourceDownloader.UriType getURLType(URI value) throws 
URISyntaxException {
     String scheme = value.getScheme();
     if (scheme == null) return UriType.FILE;
     scheme = scheme.toLowerCase();
     if ("ivy".equals(scheme)) return UriType.IVY;
     if ("file".equals(scheme)) return UriType.FILE;
+    if ("hdfs".equals(scheme)) return UriType.HDFS;
     return UriType.OTHER;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/26753ade/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java 
b/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java
index dafbe16..4b74972 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java
@@ -83,7 +83,7 @@ public class TestAddResource {
     list.add(createURI(TEST_JAR_DIR + "testjar5.jar"));
 
     //return all the dependency urls
-    Mockito.when(ss.resolveAndDownload(query, false)).thenReturn(list);
+    Mockito.when(ss.resolveAndDownload(t, query, false)).thenReturn(list);
     addList.add(query);
     ss.add_resources(t, addList);
     Set<String> dependencies = ss.list_resource(t, null);
@@ -119,7 +119,7 @@ public class TestAddResource {
 
     Collections.sort(list);
 
-    Mockito.when(ss.resolveAndDownload(query, false)).thenReturn(list);
+    Mockito.when(ss.resolveAndDownload(t, query, false)).thenReturn(list);
     for (int i = 0; i < 10; i++) {
       addList.add(query);
     }
@@ -157,8 +157,8 @@ public class TestAddResource {
     list2.add(createURI(TEST_JAR_DIR + "testjar3.jar"));
     list2.add(createURI(TEST_JAR_DIR + "testjar4.jar"));
 
-    Mockito.when(ss.resolveAndDownload(query1, false)).thenReturn(list1);
-    Mockito.when(ss.resolveAndDownload(query2, false)).thenReturn(list2);
+    Mockito.when(ss.resolveAndDownload(t, query1, false)).thenReturn(list1);
+    Mockito.when(ss.resolveAndDownload(t, query2, false)).thenReturn(list2);
     addList.add(query1);
     addList.add(query2);
     ss.add_resources(t, addList);
@@ -208,8 +208,8 @@ public class TestAddResource {
     Collections.sort(list1);
     Collections.sort(list2);
 
-    Mockito.when(ss.resolveAndDownload(query1, false)).thenReturn(list1);
-    Mockito.when(ss.resolveAndDownload(query2, false)).thenReturn(list2);
+    Mockito.when(ss.resolveAndDownload(t, query1, false)).thenReturn(list1);
+    Mockito.when(ss.resolveAndDownload(t, query2, false)).thenReturn(list2);
     addList.add(query1);
     addList.add(query2);
     ss.add_resources(t, addList);
@@ -267,9 +267,9 @@ public class TestAddResource {
     Collections.sort(list2);
     Collections.sort(list3);
 
-    Mockito.when(ss.resolveAndDownload(query1, false)).thenReturn(list1);
-    Mockito.when(ss.resolveAndDownload(query2, false)).thenReturn(list2);
-    Mockito.when(ss.resolveAndDownload(query3, false)).thenReturn(list3);
+    Mockito.when(ss.resolveAndDownload(t, query1, false)).thenReturn(list1);
+    Mockito.when(ss.resolveAndDownload(t, query2, false)).thenReturn(list2);
+    Mockito.when(ss.resolveAndDownload(t, query3, false)).thenReturn(list3);
     addList.add(query1);
     addList.add(query2);
     addList.add(query3);

Reply via email to