IMPALA-5431: Remove redundant path exists checks during table load There are multiple places that do an exists() check on a path and then perform some subsequent action on it. This pattern results in two RPCs to the NN (one for the exists() check and one for the subsequent action). We can avoid the exists() check in these cases since most HDFS methods on paths throw a FileNotFoundException if the path does not exist. This can save an RPC to NN and improve the metadata loading time.
Testing: Enough tests already cover this code path. This patch passed core and exhaustive tests. Metadata benchmark shows decent increase in perf numbers, for ex: 100K-PARTITIONS-1M-FILES-CUSTOM-05-QUERY-AFTER-INV -20.51% 80-PARTITIONS-250K-FILES-S3-03-RECOVER -20.58% 80-PARTITIONS-250K-FILES-11-DROP-PARTITION -22.13% 80-PARTITIONS-250K-FILES-S3-08-ADD-PARTITION -22.38% 80-PARTITIONS-250K-FILES-S3-12-DROP -23.69% 100K-PARTITIONS-1M-FILES-CUSTOM-11-REFRESH-PARTITION -23.91% 100K-PARTITIONS-1M-FILES-CUSTOM-10-REFRESH-AFTER-ADD-PARTITION -26.04% 100K-PARTITIONS-1M-FILES-CUSTOM-07-REFRESH -26.38% 80-PARTITIONS-250K-FILES-S3-02-CREATE -36.47% 100K-PARTITIONS-1M-FILES-CUSTOM-12-QUERY-PARTITIONS -58.72% 80-PARTITIONS-250K-FILES-S3-01-DROP -95.33% 80-PARTITIONS-250K-FILES-01-DROP -95.93% Change-Id: Id10ecf64ea2eda2d0f9299c0aa371933eca22281 Reviewed-on: http://gerrit.cloudera.org:8080/7095 Reviewed-by: Bharath Vissapragada <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d9fc9be0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d9fc9be0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d9fc9be0 Branch: refs/heads/master Commit: d9fc9be02195605a63365d6171b75e95e646dab7 Parents: 466808a Author: Bharath Vissapragada <[email protected]> Authored: Mon Jun 5 16:04:46 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Jun 27 03:40:52 2017 +0000 ---------------------------------------------------------------------- .../org/apache/impala/catalog/HdfsTable.java | 68 +++++++++----------- .../apache/impala/common/FileSystemUtil.java | 35 +++++++++- 2 files changed, 64 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d9fc9be0/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index bbdbd16..6aca8dc 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -17,6 +17,7 @@ package org.apache.impala.catalog; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -262,9 +263,8 @@ public class HdfsTable extends Table { private void loadBlockMetadata(Path dirPath, HashMap<Path, List<HdfsPartition>> partsByPath) { try { - FileSystem fs = dirPath.getFileSystem(CONF); // No need to load blocks for empty partitions list. - if (partsByPath.size() == 0 || !fs.exists(dirPath)) return; + if (partsByPath.size() == 0) return; if (LOG.isTraceEnabled()) { LOG.trace("Loading block md for " + name_ + " directory " + dirPath.toString()); } @@ -291,6 +291,7 @@ public class HdfsTable extends Table { } } + FileSystem fs = dirPath.getFileSystem(CONF); // For file systems that do not support BlockLocation API, we manually synthesize // block location metadata based on file formats. if (!FileSystemUtil.supportsStorageIds(fs)) { @@ -298,8 +299,10 @@ public class HdfsTable extends Table { return; } + RemoteIterator<LocatedFileStatus> fileStatusIter = + FileSystemUtil.listFiles(fs, dirPath, true); + if (fileStatusIter == null) return; Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0)); - RemoteIterator<LocatedFileStatus> fileStatusIter = fs.listFiles(dirPath, true); while (fileStatusIter.hasNext()) { LocatedFileStatus fileStatus = fileStatusIter.next(); if (!FileSystemUtil.isValidDataFile(fileStatus)) continue; @@ -360,7 +363,9 @@ public class HdfsTable extends Table { */ private void synthesizeBlockMetadata(FileSystem fs, Path dirPath, HashMap<Path, List<HdfsPartition>> partsByPath) throws IOException { - RemoteIterator<LocatedFileStatus> fileStatusIter = fs.listFiles(dirPath, true); + RemoteIterator<LocatedFileStatus> fileStatusIter = + FileSystemUtil.listFiles(fs, dirPath, true); + if (fileStatusIter == null) return; while (fileStatusIter.hasNext()) { LocatedFileStatus fileStatus = fileStatusIter.next(); if (!FileSystemUtil.isValidDataFile(fileStatus)) continue; @@ -644,9 +649,7 @@ public class HdfsTable extends Table { if (isMarkedCached_) part.markCached(); addPartition(part); FileSystem fs = tblLocation.getFileSystem(CONF); - if (fs.exists(tblLocation)) { - accessLevel_ = getAvailableAccessLevel(fs, tblLocation); - } + accessLevel_ = getAvailableAccessLevel(fs, tblLocation); } else { for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) { HdfsPartition partition = createPartition(msPartition.getSd(), msPartition); @@ -704,10 +707,6 @@ public class HdfsTable extends Table { Preconditions.checkNotNull(partDir); try { FileSystem fs = partDir.getFileSystem(CONF); - if (!fs.exists(partDir)) { - partition.setFileDescriptors(new ArrayList<FileDescriptor>()); - return; - } if (!FileSystemUtil.supportsStorageIds(fs)) { synthesizeBlockMetadata(fs, partition); return; @@ -722,21 +721,22 @@ public class HdfsTable extends Table { // Iterate through the current snapshot of the partition directory listing to // figure out files that were newly added/modified. List<FileDescriptor> newFileDescs = Lists.newArrayList(); - long newPartSizeBytes = 0; - for (FileStatus fileStatus : fs.listStatus(partDir)) { - if (!FileSystemUtil.isValidDataFile(fileStatus)) continue; - String fileName = fileStatus.getPath().getName().toString(); - FileDescriptor fd = fileDescsByName.get(fileName); - if (fd == null || partition.isMarkedCached() || - fd.getFileLength() != fileStatus.getLen() || - fd.getModificationTime() != fileStatus.getModificationTime()) { - BlockLocation[] locations = - fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); - fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_, - new Reference<Long>(Long.valueOf(0))); + FileStatus[] pathStatus = FileSystemUtil.listStatus(fs, partDir); + if (pathStatus != null) { + for (FileStatus fileStatus: pathStatus) { + if (!FileSystemUtil.isValidDataFile(fileStatus)) continue; + String fileName = fileStatus.getPath().getName().toString(); + FileDescriptor fd = fileDescsByName.get(fileName); + if (fd == null || partition.isMarkedCached() || + fd.getFileLength() != fileStatus.getLen() || + fd.getModificationTime() != fileStatus.getModificationTime()) { + BlockLocation[] locations = + fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_, + new Reference<Long>(Long.valueOf(0))); + } + newFileDescs.add(fd); } - newFileDescs.add(fd); - newPartSizeBytes += fileStatus.getLen(); } partition.setFileDescriptors(newFileDescs); } catch(IOException e) { @@ -800,7 +800,7 @@ public class HdfsTable extends Table { FsPermissionChecker permissionChecker = FsPermissionChecker.getInstance(); while (location != null) { - if (fs.exists(location)) { + try { FsPermissionChecker.Permissions perms = permissionChecker.getPermissions(fs, location); if (perms.canReadAndWrite()) { @@ -811,8 +811,9 @@ public class HdfsTable extends Table { return TAccessLevel.WRITE_ONLY; } return TAccessLevel.NONE; + } catch (FileNotFoundException e) { + location = location.getParent(); } - location = location.getParent(); } // Should never get here. Preconditions.checkNotNull(location, "Error: no path ancestor exists"); @@ -1105,9 +1106,7 @@ public class HdfsTable extends Table { if (msTbl.getPartitionKeysSize() == 0) { Path location = new Path(hdfsBaseDir_); FileSystem fs = location.getFileSystem(CONF); - if (fs.exists(location)) { - accessLevel_ = getAvailableAccessLevel(fs, location); - } + accessLevel_ = getAvailableAccessLevel(fs, location); } setMetaStoreTable(msTbl); } @@ -1482,9 +1481,6 @@ public class HdfsTable extends Table { HdfsPartition partition) throws Exception { Preconditions.checkNotNull(storageDescriptor); Preconditions.checkNotNull(partition); - Path partDirPath = new Path(storageDescriptor.getLocation()); - FileSystem fs = partDirPath.getFileSystem(CONF); - if (!fs.exists(partDirPath)) return; refreshFileMetadata(partition); } @@ -1661,9 +1657,6 @@ public class HdfsTable extends Table { HashSet<List<LiteralExpr>> existingPartitions, List<List<String>> partitionsNotInHms) throws IOException { FileSystem fs = path.getFileSystem(CONF); - // Check whether the base directory exists. - if (!fs.exists(path)) return; - List<String> partitionValues = Lists.newArrayList(); List<LiteralExpr> partitionExprs = Lists.newArrayList(); getAllPartitionsNotInHms(path, partitionKeys, 0, fs, partitionValues, @@ -1701,7 +1694,8 @@ public class HdfsTable extends Table { return; } - FileStatus[] statuses = fs.listStatus(path); + FileStatus[] statuses = FileSystemUtil.listStatus(fs, path); + if (statuses == null) return; for (FileStatus status: statuses) { if (!status.isDirectory()) continue; Pair<String, LiteralExpr> keyValues = http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d9fc9be0/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java index 9ae4269..2841993 100644 --- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java @@ -29,14 +29,17 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.adl.AdlFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.impala.catalog.HdfsCompression; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Objects; import com.google.common.base.Preconditions; @@ -46,7 +49,7 @@ import com.google.common.base.Preconditions; */ public class FileSystemUtil { private static final Configuration CONF = new Configuration(); - private static final Logger LOG = Logger.getLogger(FileSystemUtil.class); + private static final Logger LOG = LoggerFactory.getLogger(FileSystemUtil.class); /** * Performs a non-recursive delete of all visible (non-hidden) files in a given @@ -483,4 +486,32 @@ public class FileSystemUtil { FileSystemUtil.isS3AFileSystem(path) || FileSystemUtil.isADLFileSystem(path)); } + + /** + * Wrapper around FileSystem.listStatus() that specifically handles the case when + * the path does not exist. This helps simplify the caller code in cases where + * the file does not exist and also saves an RPC as the caller need not do a separate + * exists check for the path. Returns null if the path does not exist. + */ + public static FileStatus[] listStatus(FileSystem fs, Path p) throws IOException { + try { + return fs.listStatus(p); + } catch (FileNotFoundException e) { + if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + p.toString(), e); + return null; + } + } + + /** + * Wrapper around FileSystem.listFiles(), similar to the listStatus() wrapper above. + */ + public static RemoteIterator<LocatedFileStatus> listFiles(FileSystem fs, Path p, + boolean recursive) throws IOException { + try { + return fs.listFiles(p, recursive); + } catch (FileNotFoundException e) { + if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + p.toString(), e); + return null; + } + } }
