HIVE-14953 : don't use globStatus on S3 in MM tables (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0f7f4ed8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0f7f4ed8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0f7f4ed8 Branch: refs/heads/hive-14535 Commit: 0f7f4ed83fffb355666ebf0a0a259872156a133c Parents: 65a380d Author: Sergey Shelukhin <[email protected]> Authored: Tue Oct 25 14:06:14 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Tue Oct 25 14:06:14 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/common/ValidWriteIds.java | 5 +- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 + .../apache/hadoop/hive/ql/exec/Utilities.java | 174 +++++++++++++------ .../apache/hadoop/hive/ql/metadata/Hive.java | 35 ++-- 4 files changed, 150 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0f7f4ed8/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java index 6b38247..c61b63a 100644 --- a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java +++ b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java @@ -122,18 +122,17 @@ public class ValidWriteIds { public static class IdPathFilter implements PathFilter { - private final String mmDirName, tmpPrefix; + private final String mmDirName; private final boolean isMatch; public IdPathFilter(long writeId, boolean isMatch) { this.mmDirName = ValidWriteIds.getMmFilePrefix(writeId); - this.tmpPrefix = "_tmp." + mmDirName; this.isMatch = isMatch; } @Override public boolean accept(Path path) { String name = path.getName(); - return isMatch == (name.equals(mmDirName) || name.equals(tmpPrefix)); + return isMatch == name.equals(mmDirName); } } http://git-wip-us.apache.org/repos/asf/hive/blob/0f7f4ed8/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8a00f07..6848811 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3141,6 +3141,10 @@ public class HiveConf extends Configuration { "MM write ID will not be removed up for that long after it has been aborted;\n" + "this is to work around potential races e.g. with FS visibility, when deleting files."), + + HIVE_MM_AVOID_GLOBSTATUS_ON_S3("hive.mm.avoid.s3.globstatus", true, + "Whether to use listFiles (optimized on S3) instead of globStatus when on S3."), + HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list", "hive.security.authenticator.manager,hive.security.authorization.manager," + "hive.security.metastore.authorization.manager,hive.security.metastore.authenticator.manager," + http://git-wip-us.apache.org/repos/asf/hive/blob/0f7f4ed8/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index a7050ab..e0af81e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -85,8 +85,10 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveInterruptCallback; @@ -1524,6 +1526,18 @@ public final class Utilities { ? conf.getTable().getNumBuckets() : 0; return removeTempOrDuplicateFiles(fs, fileStats, dpLevels, numBuckets, hconf, null); } + + private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException { + FileStatus[] items = fs.listStatus(path); + // remove empty directory since DP insert should not generate empty partitions. + // empty directories could be generated by crashed Task/ScriptOperator + if (items.length != 0) return false; + if (!fs.delete(path, true)) { + LOG.error("Cannot delete empty directory " + path); + throw new IOException("Cannot delete empty directory " + path); + } + return true; + } public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, int dpLevels, int numBuckets, Configuration hconf, Long mmWriteId) throws IOException { @@ -1535,21 +1549,15 @@ public final class Utilities { if (dpLevels > 0) { FileStatus parts[] = fileStats; for (int i = 0; i < parts.length; ++i) { - assert parts[i].isDir() : "dynamic partition " + parts[i].getPath() + assert parts[i].isDirectory() : "dynamic partition " + parts[i].getPath() + " is not a directory"; - Utilities.LOG14535.info("removeTempOrDuplicateFiles looking at DP " + parts[i].getPath()); - FileStatus[] items = fs.listStatus(parts[i].getPath()); - - // remove empty directory since DP insert should not generate empty partitions. - // empty directories could be generated by crashed Task/ScriptOperator - if (items.length == 0) { - if (!fs.delete(parts[i].getPath(), true)) { - LOG.error("Cannot delete empty directory " + parts[i].getPath()); - throw new IOException("Cannot delete empty directory " + parts[i].getPath()); - } + Path path = parts[i].getPath(); + Utilities.LOG14535.info("removeTempOrDuplicateFiles looking at DP " + path); + if (removeEmptyDpDirectory(fs, path)) { parts[i] = null; continue; } + FileStatus[] items = fs.listStatus(path); if (mmWriteId != null) { Path mmDir = parts[i].getPath(); @@ -1575,8 +1583,7 @@ public final class Utilities { throw new IOException("Unexpected directories for non-DP MM: " + Arrays.toString(items)); } Path mmDir = items[0].getPath(); - if (!items[0].isDirectory() - || !mmDir.getName().equals(ValidWriteIds.getMmFilePrefix(mmWriteId))) { + if (!mmDir.getName().equals(ValidWriteIds.getMmFilePrefix(mmWriteId))) { throw new IOException("Unexpected non-MM directory " + mmDir); } Utilities.LOG14535.info( @@ -3803,31 +3810,98 @@ public final class Utilities { } } - public static FileStatus[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels, - int lbLevels, PathFilter filter, long mmWriteId) throws IOException { + public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels, + int lbLevels, PathFilter filter, long mmWriteId, Configuration conf) throws IOException { + int skipLevels = dpLevels + lbLevels; + if (filter == null) { + filter = new ValidWriteIds.IdPathFilter(mmWriteId, true); + } + if (skipLevels == 0) { + return statusToPath(fs.listStatus(path, filter)); + } + if (fs.getScheme().equalsIgnoreCase("s3a") + && HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3)) { + return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter); + } + return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, mmWriteId); + } + + private static Path[] statusToPath(FileStatus[] statuses) { + if (statuses == null) return null; + Path[] paths = new Path[statuses.length]; + for (int i = 0; i < statuses.length; ++i) { + paths[i] = statuses[i].getPath(); + } + return paths; + } + + private static Path[] getMmDirectoryCandidatesRecursive(FileSystem fs, + Path path, int skipLevels, PathFilter filter) throws IOException { + String lastRelDir = null; + HashSet<Path> results = new HashSet<Path>(); + String relRoot = Path.getPathWithoutSchemeAndAuthority(path).toString(); + if (!relRoot.endsWith(Path.SEPARATOR)) { + relRoot += Path.SEPARATOR; + } + RemoteIterator<LocatedFileStatus> allFiles = fs.listFiles(path, true); + while (allFiles.hasNext()) { + LocatedFileStatus lfs = allFiles.next(); + Path dirPath = Path.getPathWithoutSchemeAndAuthority(lfs.getPath()); + String dir = dirPath.toString(); + if (!dir.startsWith(relRoot)) { + throw new IOException("Path " + lfs.getPath() + " is not under " + relRoot + + " (when shortened to " + dir + ")"); + } + String subDir = dir.substring(relRoot.length()); + Utilities.LOG14535.info("Looking at " + subDir + " from " + lfs.getPath()); + // If sorted, we'll skip a bunch of files. + if (lastRelDir != null && subDir.startsWith(lastRelDir)) continue; + int startIx = skipLevels > 0 ? -1 : 0; + for (int i = 0; i < skipLevels; ++i) { + startIx = subDir.indexOf(Path.SEPARATOR_CHAR, startIx + 1); + if (startIx == -1) { + Utilities.LOG14535.info("Expected level of nesting (" + skipLevels + ") is not " + + " present in " + subDir + " (from " + lfs.getPath() + ")"); + break; + } + } + if (startIx == -1) continue; + int endIx = subDir.indexOf(Path.SEPARATOR_CHAR, startIx + 1); + if (endIx == -1) { + Utilities.LOG14535.info("Expected level of nesting (" + (skipLevels + 1) + ") is not " + + " present in " + subDir + " (from " + lfs.getPath() + ")"); + continue; + } + lastRelDir = subDir = subDir.substring(0, endIx); + Path candidate = new Path(relRoot, subDir); + Utilities.LOG14535.info("Considering MM directory candidate " + candidate); + if (!filter.accept(candidate)) continue; + results.add(fs.makeQualified(candidate)); + } + return results.toArray(new Path[results.size()]); + } + + private static Path[] getMmDirectoryCandidatesGlobStatus(FileSystem fs, + Path path, int skipLevels, PathFilter filter, long mmWriteId) throws IOException { StringBuilder sb = new StringBuilder(path.toUri().getPath()); - for (int i = 0; i < dpLevels + lbLevels; i++) { + for (int i = 0; i < skipLevels; i++) { sb.append(Path.SEPARATOR).append("*"); } sb.append(Path.SEPARATOR).append(ValidWriteIds.getMmFilePrefix(mmWriteId)); - Utilities.LOG14535.info("Looking for files via: " + sb.toString()); Path pathPattern = new Path(path, sb.toString()); - if (filter == null) { - // TODO: do we need this? Likely yes; we don't want mm_10 when we use ".../mm_1" pattern. - filter = new ValidWriteIds.IdPathFilter(mmWriteId, true); - } - return filter == null ? fs.globStatus(pathPattern) : fs.globStatus(pathPattern, filter); + Utilities.LOG14535.info("Looking for files via: " + pathPattern); + return statusToPath(fs.globStatus(pathPattern, filter)); } private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir, int dpLevels, int lbLevels, String unionSuffix, ValidWriteIds.IdPathFilter filter, - long mmWriteId) throws IOException { - FileStatus[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, mmWriteId); + long mmWriteId, Configuration conf) throws IOException { + Path[] files = getMmDirectoryCandidates( + fs, specPath, dpLevels, lbLevels, filter, mmWriteId, conf); if (files != null) { - for (FileStatus status : files) { - Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure"); - tryDelete(fs, status.getPath()); + for (Path path : files) { + Utilities.LOG14535.info("Deleting " + path + " on failure"); + tryDelete(fs, path); } } Utilities.LOG14535.info("Deleting " + manifestDir + " on failure"); @@ -3882,15 +3956,15 @@ public final class Utilities { if (!success) { ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true); tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, - unionSuffix, filter, mmWriteId); + unionSuffix, filter, mmWriteId, hconf); return; } Utilities.LOG14535.info("Looking for manifests in: " + manifestDir + " (" + mmWriteId + ")"); - FileStatus[] files = fs.listStatus(manifestDir); + FileStatus[] manifestFiles = fs.listStatus(manifestDir); List<Path> manifests = new ArrayList<>(); - if (files != null) { - for (FileStatus status : files) { + if (manifestFiles != null) { + for (FileStatus status : manifestFiles) { Path path = status.getPath(); if (path.getName().endsWith(MANIFEST_EXTENSION)) { Utilities.LOG14535.info("Reading manifest " + path); @@ -3901,21 +3975,13 @@ public final class Utilities { Utilities.LOG14535.info("Looking for files in: " + specPath); ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true); - files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, mmWriteId); - ArrayList<FileStatus> mmDirectories = new ArrayList<>(); + Path[] files = getMmDirectoryCandidates( + fs, specPath, dpLevels, lbLevels, filter, mmWriteId, hconf); + ArrayList<Path> mmDirectories = new ArrayList<>(); if (files != null) { - for (FileStatus status : files) { - Path path = status.getPath(); + for (Path path : files) { Utilities.LOG14535.info("Looking at path: " + path); - if (!status.isDirectory()) { - if (!path.getName().endsWith(MANIFEST_EXTENSION)) { - Utilities.LOG14535.warn("Unknown file found, deleting: " + path); - tryDelete(fs, path); - } - } else { - mmDirectories.add(status); - } + mmDirectories.add(path); } } @@ -3944,8 +4010,8 @@ public final class Utilities { } } - for (FileStatus status : mmDirectories) { - cleanMmDirectory(status.getPath(), fs, unionSuffix, committed); + for (Path path : mmDirectories) { + cleanMmDirectory(path, fs, unionSuffix, committed); } if (!committed.isEmpty()) { @@ -3957,7 +4023,12 @@ public final class Utilities { // TODO: see HIVE-14886 - removeTempOrDuplicateFiles is broken for list bucketing, // so maintain parity here by not calling it at all. if (lbLevels != 0) return; - FileStatus[] finalResults = mmDirectories.toArray(new FileStatus[mmDirectories.size()]); + // Create fake file statuses to avoid querying the file system. removeTempOrDuplicateFiles + // doesn't need tocheck anything except path and directory status for MM directories. + FileStatus[] finalResults = new FileStatus[mmDirectories.size()]; + for (int i = 0; i < mmDirectories.size(); ++i) { + finalResults[i] = new PathOnlyFileStatus(mmDirectories.get(i)); + } List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles( fs, finalResults, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, mmWriteId); // create empty buckets if necessary @@ -3967,6 +4038,12 @@ public final class Utilities { } } + private static final class PathOnlyFileStatus extends FileStatus { + public PathOnlyFileStatus(Path path) { + super(0, true, 0, 0, 0, path); + } + } + private static void cleanMmDirectory(Path dir, FileSystem fs, String unionSuffix, HashSet<String> committed) throws IOException, HiveException { for (FileStatus child : fs.listStatus(dir)) { @@ -3975,7 +4052,6 @@ public final class Utilities { if (committed.remove(childPath.toString())) continue; // A good file. deleteUncommitedFile(childPath, fs); } else if (!child.isDirectory()) { - // TODO# needed? if (childPath.getName().endsWith(MANIFEST_EXTENSION)) continue; if (committed.contains(childPath.toString())) { throw new HiveException("Union FSOP has commited " + childPath + " outside of union directory" + unionSuffix); http://git-wip-us.apache.org/repos/asf/hive/blob/0f7f4ed8/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 0a29895..c7ac452 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1589,6 +1589,7 @@ public class Hive { List<Path> newFiles = null; PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", "FileMoves"); + // TODO: this assumes both paths are qualified; which they are, currently. if (mmWriteId != null && loadPath.equals(newPartPath)) { // MM insert query, move itself is a no-op. Utilities.LOG14535.info("not moving " + loadPath + " to " + newPartPath + " (MM)"); @@ -1705,7 +1706,7 @@ public class Hive { FileSystem srcFs; try { srcFs = loadPath.getFileSystem(conf); - srcs = srcFs.globStatus(loadPath); + srcs = srcFs.listStatus(loadPath); } catch (IOException e) { LOG.error("Error listing files", e); throw new HiveException(e); @@ -1847,29 +1848,30 @@ private void constructOneLBLocationMap(FileStatus fSta, Set<Path> validPartitions = new HashSet<Path>(); try { FileSystem fs = loadPath.getFileSystem(conf); - FileStatus[] leafStatus = null; if (mmWriteId == null) { - leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); + FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); + // Check for empty partitions + for (FileStatus s : leafStatus) { + if (!s.isDirectory()) { + throw new HiveException("partition " + s.getPath() + " is not a directory!"); + } + Path dpPath = s.getPath(); + Utilities.LOG14535.info("Found DP " + dpPath); + validPartitions.add(dpPath); + } } else { // The non-MM path only finds new partitions, as it is looking at the temp path. // To produce the same effect, we will find all the partitions affected by this write ID. - leafStatus = Utilities.getMmDirectoryCandidates( - fs, loadPath, numDP, numLB, null, mmWriteId); - } - // Check for empty partitions - for (FileStatus s : leafStatus) { - if (mmWriteId == null && !s.isDirectory()) { - throw new HiveException("partition " + s.getPath() + " is not a directory!"); - } - Path dpPath = s.getPath(); - if (mmWriteId != null) { - dpPath = dpPath.getParent(); // Skip the MM directory that we have found. + Path[] leafStatus = Utilities.getMmDirectoryCandidates( + fs, loadPath, numDP, numLB, null, mmWriteId, conf); + for (Path p : leafStatus) { + Path dpPath = p.getParent(); // Skip the MM directory that we have found. for (int i = 0; i < numLB; ++i) { dpPath = dpPath.getParent(); // Now skip the LB directories, if any... } + Utilities.LOG14535.info("Found DP " + dpPath); + validPartitions.add(dpPath); } - Utilities.LOG14535.info("Found DP " + dpPath); - validPartitions.add(dpPath); } } catch (IOException e) { throw new HiveException(e); @@ -2047,6 +2049,7 @@ private void constructOneLBLocationMap(FileStatus fSta, if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { newFiles = Collections.synchronizedList(new ArrayList<Path>()); } + // TODO: this assumes both paths are qualified; which they are, currently. if (mmWriteId != null && loadPath.equals(tbl.getPath())) { Utilities.LOG14535.info("not moving " + loadPath + " to " + tbl.getPath()); if (replace) {
