Repository: hive Updated Branches: refs/heads/master 66a021164 -> 107204a78
HIVE-13716 : Improve dynamic partition loading V (Ashutosh Chauhan via Rui Li) Signed-off-by: Ashutosh Chauhan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/107204a7 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/107204a7 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/107204a7 Branch: refs/heads/master Commit: 107204a78de0edceaeb4070c2df22214fb56b858 Parents: 66a0211 Author: Ashutosh Chauhan <[email protected]> Authored: Sun May 8 17:12:53 2016 -0700 Committer: Ashutosh Chauhan <[email protected]> Committed: Wed May 11 19:25:49 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/FileUtils.java | 6 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 2 +- .../apache/hadoop/hive/ql/exec/MoveTask.java | 2 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 110 ++++++++++--------- .../org/apache/hadoop/hive/io/HdfsUtils.java | 71 +++++++----- 5 files changed, 109 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/common/src/java/org/apache/hadoop/hive/common/FileUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 71c9188..b65c35b 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -529,7 +529,7 @@ public final class FileUtils { } else { try { //set on the entire subtree - HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, lastExistingParent), fs, firstNonExistentParent); + HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, lastExistingParent), fs, firstNonExistentParent, true); } catch (Exception e) { LOG.warn("Error setting permissions of " + firstNonExistentParent, e); } @@ -566,7 +566,7 @@ public final class FileUtils { boolean inheritPerms = conf.getBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); if (copied && inheritPerms) { try { - HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, dstFS, dst.getParent()), dstFS, dst); + HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, dstFS, dst.getParent()), dstFS, dst, true); } catch (Exception e) { LOG.warn("Error setting permissions or group of " + dst, e); } @@ -685,7 +685,7 @@ public final class FileUtils { //rename the directory if (fs.rename(sourcePath, destPath)) { try { - HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, destPath.getParent()), fs, destPath); + HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, destPath.getParent()), fs, destPath, true); } catch (Exception e) { LOG.warn("Error setting permissions or group of " + destPath, e); } http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 9392b6d..707de1f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4216,7 +4216,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { fs.delete(location, true); fs.mkdirs(location); try { - HdfsUtils.setFullFileStatus(conf, status, fs, location); + HdfsUtils.setFullFileStatus(conf, status, fs, location, false); } catch (Exception e) { LOG.warn("Error setting permissions of " + location, e); } http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index bdda89a..21aa315 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -180,7 +180,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { fs.mkdirs(mkDirPath); if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS)) { try { - HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, actualPath), fs, mkDirPath); + HdfsUtils.setFullFileStatus(conf, new HdfsUtils.HadoopFileStatus(conf, fs, actualPath), fs, mkDirPath, true); } catch (Exception e) { LOG.warn("Error setting permissions or group of " + actualPath, e); } http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 6af48ec..b5e660b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2694,7 +2694,7 @@ private void constructOneLBLocationMap(FileStatus fSta, } if (inheritPerms) { - HdfsUtils.setFullFileStatus(conf, fullDestStatus, destFs, destPath); + HdfsUtils.setFullFileStatus(conf, fullDestStatus, destFs, destPath, false); } if (null != newFiles) { newFiles.add(destPath); @@ -2784,9 +2784,8 @@ private void constructOneLBLocationMap(FileStatus fSta, //method is called. when the replace value is true, this method works a little different //from mv command if the destf is a directory, it replaces the destf instead of moving under //the destf. in this case, the replaced destf still preserves the original destf's permission - public static boolean moveFile(HiveConf conf, Path srcf, final Path destf, + public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, boolean replace, boolean isSrcLocal) throws HiveException { - boolean success = false; final FileSystem srcFs, destFs; try { destFs = destf.getFileSystem(conf); @@ -2802,7 +2801,7 @@ private void constructOneLBLocationMap(FileStatus fSta, } //needed for perm inheritance. - boolean inheritPerms = HiveConf.getBoolVar(conf, + final boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); HdfsUtils.HadoopFileStatus destStatus = null; @@ -2823,8 +2822,8 @@ private void constructOneLBLocationMap(FileStatus fSta, //if destf is an existing file, rename is actually a replace, and do not need // to delete the file first if (replace && !destIsSubDir) { - LOG.debug("The path " + destf.toString() + " is deleted"); destFs.delete(destf, true); + LOG.debug("The path " + destf.toString() + " is deleted"); } } catch (FileNotFoundException ignore) { //if dest dir does not exist, any re @@ -2833,75 +2832,84 @@ private void constructOneLBLocationMap(FileStatus fSta, } } } + final HdfsUtils.HadoopFileStatus desiredStatus = destStatus; + final SessionState parentSession = SessionState.get(); if (isSrcLocal) { // For local src file, copy to hdfs destFs.copyFromLocalFile(srcf, destf); - success = true; + if (inheritPerms) { + try { + HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true); + } catch (IOException e) { + LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e); + } + } + return true; } else { if (needToCopy(srcf, destf, srcFs, destFs)) { //copy if across file system or encryption zones. - LOG.info("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different."); - success = FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf, + LOG.debug("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different."); + return FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf, true, // delete source replace, // overwrite destination conf); } else { if (destIsSubDir) { FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); - if (srcs.length == 0) { - success = true; // Nothing to move. - } else { - List<Future<Boolean>> futures = new LinkedList<>(); - final ExecutorService pool = Executors.newFixedThreadPool( - conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build()); - /* Move files one by one because source is a subdirectory of destination */ - for (final FileStatus status : srcs) { - futures.add(pool.submit(new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - return destFs.rename(status.getPath(), destf); - } - })); - } - pool.shutdown(); - boolean allFutures = true; - for (Future<Boolean> future : futures) { - try { - Boolean result = future.get(); - allFutures &= result; - if (!result) { - LOG.debug("Failed to rename."); - pool.shutdownNow(); + + List<Future<Void>> futures = new LinkedList<>(); + final ExecutorService pool = Executors.newFixedThreadPool( + conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build()); + /* Move files one by one because source is a subdirectory of destination */ + for (final FileStatus status : srcs) { + futures.add(pool.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + Path destPath = new Path(destf, status.getPath().getName()); + try { + if(destFs.rename(status.getPath(), destf)) { + if (inheritPerms) { + HdfsUtils.setFullFileStatus(conf, desiredStatus, destFs, destPath, false); + } + } else { + throw new IOException("rename for src path: " + status.getPath() + " to dest path:" + + destPath + " returned false"); + } + } catch (IOException ioe) { + LOG.error("Failed to rename/set permissions. Src path: {} Dest path: {}", status.getPath(), destPath); + throw ioe; } - } catch (Exception e) { - LOG.debug("Failed to rename.", e.getMessage()); - pool.shutdownNow(); - throw new HiveException(e.getCause()); + return null; } + })); + } + pool.shutdown(); + for (Future<Void> future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); } - success = allFutures; } + return true; } else { - success = destFs.rename(srcf, destf); + if (destFs.rename(srcf, destf)) { + if (inheritPerms) { + HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf, true); + } + return true; + } + return false; } } } - - LOG.info((replace ? "Replacing src:" : "Renaming src: ") + srcf.toString() - + ", dest: " + destf.toString() + ", Status:" + success); } catch (IOException ioe) { throw new HiveException("Unable to move source " + srcf + " to destination " + destf, ioe); } - - if (success && inheritPerms) { - try { - HdfsUtils.setFullFileStatus(conf, destStatus, destFs, destf); - } catch (IOException e) { - LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e); - } - } - return success; } /** http://git-wip-us.apache.org/repos/asf/hive/blob/107204a7/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java ---------------------------------------------------------------------- diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java b/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java index e931156..c2060fc 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/io/HdfsUtils.java @@ -57,10 +57,28 @@ public class HdfsUtils { } public static void setFullFileStatus(Configuration conf, HdfsUtils.HadoopFileStatus sourceStatus, - FileSystem fs, Path target) throws IOException { - FileStatus fStatus= sourceStatus.getFileStatus(); - String group = fStatus.getGroup(); - LOG.trace(sourceStatus.getFileStatus().toString()); + FileSystem fs, Path target, boolean recursion) throws IOException { + FileStatus fStatus= sourceStatus.getFileStatus(); + String group = fStatus.getGroup(); + boolean aclEnabled = Objects.equal(conf.get("dfs.namenode.acls.enabled"), "true"); + FsPermission sourcePerm = fStatus.getPermission(); + List<AclEntry> aclEntries = null; + AclStatus aclStatus; + if (aclEnabled) { + aclStatus = sourceStatus.getAclStatus(); + if (aclStatus != null) { + LOG.trace(aclStatus.toString()); + aclEntries = aclStatus.getEntries(); + removeBaseAclEntries(aclEntries); + + //the ACL api's also expect the tradition user/group/other permission in the form of ACL + aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.USER, sourcePerm.getUserAction())); + aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.GROUP, sourcePerm.getGroupAction())); + aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.OTHER, sourcePerm.getOtherAction())); + } + } + + if (recursion) { //use FsShell to change group, permissions, and extended ACL's recursively FsShell fsShell = new FsShell(); fsShell.setConf(conf); @@ -70,39 +88,40 @@ public class HdfsUtils { if (group != null && !group.isEmpty()) { run(fsShell, new String[]{"-chgrp", "-R", group, target.toString()}); } - - if (Objects.equal(conf.get("dfs.namenode.acls.enabled"), "true")) { - //Attempt extended Acl operations only if its enabled, 8791but don't fail the operation regardless. - try { - AclStatus aclStatus = sourceStatus.getAclStatus(); - if (aclStatus != null) { - LOG.trace(aclStatus.toString()); - List<AclEntry> aclEntries = aclStatus.getEntries(); - removeBaseAclEntries(aclEntries); - - //the ACL api's also expect the tradition user/group/other permission in the form of ACL - FsPermission sourcePerm = fStatus.getPermission(); - aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.USER, sourcePerm.getUserAction())); - aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.GROUP, sourcePerm.getGroupAction())); - aclEntries.add(newAclEntry(AclEntryScope.ACCESS, AclEntryType.OTHER, sourcePerm.getOtherAction())); - + if (aclEnabled) { + if (null != aclEntries) { + //Attempt extended Acl operations only if its enabled, 8791but don't fail the operation regardless. + try { //construct the -setfacl command - String aclEntry = Joiner.on(",").join(aclStatus.getEntries()); + String aclEntry = Joiner.on(",").join(aclEntries); run(fsShell, new String[]{"-setfacl", "-R", "--set", aclEntry, target.toString()}); + + } catch (Exception e) { + LOG.info("Skipping ACL inheritance: File system for path " + target + " " + + "does not support ACLs but dfs.namenode.acls.enabled is set to true. "); + LOG.debug("The details are: " + e, e); } - } catch (Exception e) { - LOG.info("Skipping ACL inheritance: File system for path " + target + " " + - "does not support ACLs but dfs.namenode.acls.enabled is set to true. "); - LOG.debug("The details are: " + e, e); } } else { - String permission = Integer.toString(fStatus.getPermission().toShort(), 8); + String permission = Integer.toString(sourcePerm.toShort(), 8); run(fsShell, new String[]{"-chmod", "-R", permission, target.toString()}); } } catch (Exception e) { throw new IOException("Unable to set permissions of " + target, e); } + } else { + if (group != null && !group.isEmpty()) { + fs.setOwner(target, null, group); + } + if (aclEnabled) { + if (null != aclEntries) { + fs.setAcl(target, aclEntries); + } + } else { + fs.setPermission(target, sourcePerm); + } } + } /** * Create a new AclEntry with scope, type and permission (no name).
