Repository: incubator-impala Updated Branches: refs/heads/master 5f2a44069 -> 8149d0e57
IMPALA-4847: Simplify HdfsTable block metadata loading code This commit is a part of ground work for the upcoming multi threaded block metadata loading patches. The patch for IMPALA-4172 introduced code that groups the block location requests for partition directories that reside under the table directory into a single call to the NN in order to reduce the number of RPCs. However, it turns out that the hdfs client library internally makes one RPC per directory thus defeating the purpose of optimization. Also, this made the code unnecessarily complex since we need to map each file to its corresponding partition at runtime. This patch undos that optimization and makes HDFS calls per partition, which is much easier to understand. This also helps the upcoming patch on multi threaded block metadata loading since there is much less shared state when loading multiple partitions in parallel. Change-Id: I963d647bd2ba11e3843c6ef2ac6be113c74280bf Reviewed-on: http://gerrit.cloudera.org:8080/7652 Reviewed-by: Bharath Vissapragada <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/59d1aa6e Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/59d1aa6e Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/59d1aa6e Branch: refs/heads/master Commit: 59d1aa6ea7788398574263c4a716b41c8b250ba6 Parents: 5f2a440 Author: Bharath Vissapragada <[email protected]> Authored: Mon Jul 17 11:55:51 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Mon Aug 28 20:03:06 2017 +0000 ---------------------------------------------------------------------- .../org/apache/impala/catalog/HdfsTable.java | 214 ++++++------------- .../apache/impala/common/FileSystemUtil.java | 26 --- 2 files changed, 60 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59d1aa6e/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 6aca8dc..45ebfc9 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -249,79 +249,56 @@ public class HdfsTable extends Table { } /** - * Drops and re-loads the block metadata for all partitions in 'partsByPath' whose - * location is under the given 'dirPath'. It involves the following steps: + * Drops and re-loads the block metadata of all the partitions in 'partitions' that + * correspond to the path 'partDir'. 'partDir' may belong to any file system that + * implements the hadoop's FileSystem interface (like HDFS, S3, ADLS etc.). It involves + * the following steps: * - Clear the current block metadata of the partitions. - * - Call FileSystem.listStatus() on 'dirPath' to fetch the BlockLocations for each - * file under it recursively. - * - For every valid data file, map it to a partition from 'partsByPath' (if one exists) - * and enumerate all its blocks and their corresponding hosts and disk IDs. - * Requires that 'dirPath' and all paths in 'partsByPath' have consistent qualification - * (either fully qualified or unqualified), for isDescendantPath(). - * TODO: Split this method into more logical methods for cleaner code. + * - Call FileSystem.listFiles() on 'partDir' to fetch the FileStatus and BlockLocations + * for each file under it. + * - For every valid data file, enumerate all its blocks and their corresponding hosts + * and disk IDs if the underlying file system supports the block locations API + * (for ex: HDFS). For other file systems (like S3) we synthesize the block metadata + * manually by splitting the file ranges into fixed size blocks. + * For filesystems that don't support BlockLocation API, synthesize file blocks + * by manually splitting the file range into fixed-size blocks. That way, scan + * ranges can be derived from file blocks as usual. All synthesized blocks are given + * an invalid network address so that the scheduler will treat them as remote. */ - private void loadBlockMetadata(Path dirPath, - HashMap<Path, List<HdfsPartition>> partsByPath) { + private void loadBlockMetadata(Path partDir, List<HdfsPartition> partitions) { try { // No need to load blocks for empty partitions list. - if (partsByPath.size() == 0) return; + if (partitions == null || partitions.isEmpty()) return; if (LOG.isTraceEnabled()) { - LOG.trace("Loading block md for " + name_ + " directory " + dirPath.toString()); + LOG.trace("Loading block md for " + name_ + " directory " + partDir.toString()); } // Clear the state of partitions under dirPath since they are going to be updated // based on the current snapshot of files in the directory. - List<HdfsPartition> dirPathPartitions = partsByPath.get(dirPath); - if (dirPathPartitions != null) { - // The dirPath is a partition directory. This means the path is the root of an - // unpartitioned table, or the path of at least one partition. - for (HdfsPartition partition: dirPathPartitions) { - partition.setFileDescriptors(new ArrayList<FileDescriptor>()); - } - } else { - // The dirPath is not a partition directory. We expect it to be an ancestor of - // partition paths (e.g., the table root). Clear all partitions whose paths are - // a descendant of dirPath. - for (Map.Entry<Path, List<HdfsPartition>> entry: partsByPath.entrySet()) { - Path partDir = entry.getKey(); - if (!FileSystemUtil.isDescendantPath(partDir, dirPath)) continue; - for (HdfsPartition partition: entry.getValue()) { - partition.setFileDescriptors(new ArrayList<FileDescriptor>()); - } - } - } - - FileSystem fs = dirPath.getFileSystem(CONF); - // For file systems that do not support BlockLocation API, we manually synthesize - // block location metadata based on file formats. - if (!FileSystemUtil.supportsStorageIds(fs)) { - synthesizeBlockMetadata(fs, dirPath, partsByPath); - return; + for (HdfsPartition partition: partitions) { + partition.setFileDescriptors(new ArrayList<FileDescriptor>()); } + FileSystem fs = partDir.getFileSystem(CONF); + boolean synthesizeBlockMd = !FileSystemUtil.supportsStorageIds(fs); RemoteIterator<LocatedFileStatus> fileStatusIter = - FileSystemUtil.listFiles(fs, dirPath, true); + FileSystemUtil.listFiles(fs, partDir, false); if (fileStatusIter == null) return; Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0)); while (fileStatusIter.hasNext()) { LocatedFileStatus fileStatus = fileStatusIter.next(); if (!FileSystemUtil.isValidDataFile(fileStatus)) continue; - // Find the partition that this file belongs (if any). - Path partPathDir = fileStatus.getPath().getParent(); - Preconditions.checkNotNull(partPathDir); - - List<HdfsPartition> partitions = partsByPath.get(partPathDir); - // Skip if this file does not belong to any known partition. - if (partitions == null) { - if (LOG.isTraceEnabled()) { - LOG.trace("File " + fileStatus.getPath().toString() + " doesn't correspond " + - " to a known partition. Skipping metadata load for this file."); - } - continue; + FileDescriptor fd = null; + // Block locations are manually synthesized if the underlying fs does not support + // the block location API. + if (synthesizeBlockMd) { + fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus, + partitions.get(0).getFileFormat(), hostIndex_); + } else { + fd = FileDescriptor.create(fileStatus, + fileStatus.getBlockLocations(), fs, hostIndex_, numUnknownDiskIds); } - - FileDescriptor fd = FileDescriptor.create(fileStatus, - fileStatus.getBlockLocations(), fs, hostIndex_, numUnknownDiskIds); + Preconditions.checkNotNull(fd); // Update the partitions' metadata that this file belongs to. for (HdfsPartition partition: partitions) { partition.getFileDescriptors().add(fd); @@ -337,59 +314,7 @@ public class HdfsTable extends Table { } } catch (IOException e) { throw new RuntimeException("Error loading block metadata for directory " - + dirPath.toString() + ": " + e.getMessage(), e); - } - } - - /** - * Synthesize the block metadata for a given HdfsPartition object. Should only - * be called for FileSystems that do not support storage IDs. - */ - private void synthesizeBlockMetadata(FileSystem fs, HdfsPartition partition) - throws IOException { - Preconditions.checkState(!FileSystemUtil.supportsStorageIds(fs)); - HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap(); - Path partitionPath = partition.getLocationPath(); - partition.setFileDescriptors(new ArrayList<FileDescriptor>()); - partsByPath.put(partitionPath, Lists.newArrayList(partition)); - synthesizeBlockMetadata(fs, partitionPath, partsByPath); - } - - /** - * For filesystems that don't support BlockLocation API, synthesize file blocks - * by manually splitting the file range into fixed-size blocks. That way, scan - * ranges can be derived from file blocks as usual. All synthesized blocks are given - * an invalid network address so that the scheduler will treat them as remote. - */ - private void synthesizeBlockMetadata(FileSystem fs, Path dirPath, HashMap<Path, - List<HdfsPartition>> partsByPath) throws IOException { - RemoteIterator<LocatedFileStatus> fileStatusIter = - FileSystemUtil.listFiles(fs, dirPath, true); - if (fileStatusIter == null) return; - while (fileStatusIter.hasNext()) { - LocatedFileStatus fileStatus = fileStatusIter.next(); - if (!FileSystemUtil.isValidDataFile(fileStatus)) continue; - Path partPathDir = fileStatus.getPath().getParent(); - Preconditions.checkNotNull(partPathDir); - List<HdfsPartition> partitions = partsByPath.get(partPathDir); - // Skip if this file does not belong to any known partition. - if (partitions == null) { - if (LOG.isTraceEnabled()) { - LOG.trace("File " + fileStatus.getPath().toString() + " doesn't correspond " + - " to a known partition. Skipping metadata load for this file."); - } - continue; - } - - Preconditions.checkState(partitions.size() > 0); - // For the purpose of synthesizing block metadata, we assume that all partitions - // with the same location have the same file format. - FileDescriptor fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus, - partitions.get(0).getFileFormat(), hostIndex_); - // Update the partitions' metadata that this file belongs to. - for (HdfsPartition partition: partitions) { - partition.getFileDescriptors().add(fd); - } + + partDir.toString() + ": " + e.getMessage(), e); } } @@ -610,8 +535,7 @@ public class HdfsTable extends Table { * Create HdfsPartition objects corresponding to 'msPartitions' and add them to this * table's partition list. Any partition metadata will be reset and loaded from * scratch. For each partition created, we load the block metadata for each data file - * under it. We optimize the block metadata loading by grouping together the name node - * requests for all the partitions under the table base directory into a single RPC. + * under it. * * If there are no partitions in the Hive metadata, a single partition is added with no * partition keys. @@ -626,17 +550,6 @@ public class HdfsTable extends Table { // using createPartition() calls. A single partition path can correspond to multiple // partitions. HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap(); - // Qualify to ensure isDescendantPath() works correctly. - Path tblLocation = FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath()); - // List of directories that we scan for block locations. We optimize the block metadata - // loading to reduce the number of RPCs to the NN by separately loading partitions - // with default directory paths (under the base table directory) and non-default - // directory paths. For the former we issue a single RPC to the NN to load all the - // blocks from hdfsBaseDir_ and for the latter we load each of the partition directory - // separately. - // TODO: We can still do some advanced optimization by grouping all the partition - // directories under the same ancestor path up the tree. - Set<Path> dirsToLoad = Sets.newHashSet(tblLocation); if (msTbl.getPartitionKeysSize() == 0) { Preconditions.checkArgument(msPartitions == null || msPartitions.isEmpty()); @@ -644,6 +557,7 @@ public class HdfsTable extends Table { // We model partitions slightly differently to Hive - every file must exist in a // partition, so add a single partition with no keys which will get all the // files in the table's root directory. + Path tblLocation = FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath()); HdfsPartition part = createPartition(msTbl.getSd(), null); partsByPath.put(tblLocation, Lists.newArrayList(part)); if (isMarkedCached_) part.markCached(); @@ -670,7 +584,6 @@ public class HdfsTable extends Table { accessLevel_ = TAccessLevel.READ_ONLY; } - // Qualify to ensure isDescendantPath() works correctly. Path partDir = FileSystemUtil.createFullyQualifiedPath( new Path(msPartition.getSd().getLocation())); List<HdfsPartition> parts = partsByPath.get(partDir); @@ -679,17 +592,10 @@ public class HdfsTable extends Table { } else { parts.add(partition); } - - if (!dirsToLoad.contains(partDir) && - !FileSystemUtil.isDescendantPath(partDir, tblLocation)) { - // This partition has a custom filesystem location. Load its file/block - // metadata separately by adding it to the list of dirs to load. - dirsToLoad.add(partDir); - } } } - loadMetadataAndDiskIds(dirsToLoad, partsByPath); + loadMetadataAndDiskIds(partsByPath); } /** @@ -707,10 +613,7 @@ public class HdfsTable extends Table { Preconditions.checkNotNull(partDir); try { FileSystem fs = partDir.getFileSystem(CONF); - if (!FileSystemUtil.supportsStorageIds(fs)) { - synthesizeBlockMetadata(fs, partition); - return; - } + boolean synthesizeBlockMd = !FileSystemUtil.supportsStorageIds(fs); // Index the partition file descriptors by their file names for O(1) look ups. ImmutableMap<String, FileDescriptor> fileDescsByName = Maps.uniqueIndex( partition.getFileDescriptors(), new Function<FileDescriptor, String>() { @@ -730,11 +633,17 @@ public class HdfsTable extends Table { if (fd == null || partition.isMarkedCached() || fd.getFileLength() != fileStatus.getLen() || fd.getModificationTime() != fileStatus.getModificationTime()) { - BlockLocation[] locations = - fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); - fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_, - new Reference<Long>(Long.valueOf(0))); + if (synthesizeBlockMd) { + fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus, + partition.getFileFormat(), hostIndex_); + } else { + BlockLocation[] locations = + fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_, + new Reference<Long>(Long.valueOf(0))); + } } + Preconditions.checkNotNull(fd); newFileDescs.add(fd); } } @@ -751,27 +660,24 @@ public class HdfsTable extends Table { * use refreshFileMetadata(HdfsPartition). */ private void loadFileMetadataFromScratch(HdfsPartition partition) { - Path partitionDirPath = partition.getLocationPath(); - Set<Path> dirsToLoad = Sets.newHashSet(partitionDirPath); HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap(); - partsByPath.put(partitionDirPath, Lists.newArrayList(partition)); - loadMetadataAndDiskIds(dirsToLoad, partsByPath); + partsByPath.put(partition.getLocationPath(), Lists.newArrayList(partition)); + loadMetadataAndDiskIds(partsByPath); } /** - * Helper method to load the block locations from each directory in 'locations' - * and filtering only the paths from 'partsByPath'. Also loads the disk IDs - * corresponding to these block locations. + * Helper method to load the block locations for each partition directory in + * partsByPath. 'partsByPath' maps each partition directory to the corresponding + * HdfsPartition objects. */ - private void loadMetadataAndDiskIds(Set<Path> locations, - HashMap<Path, List<HdfsPartition>> partsByPath) { - LOG.info(String.format( - "Loading file and block metadata for %s partitions from %s paths: %s", - partsByPath.size(), locations.size(), getFullName())); - for (Path location: locations) loadBlockMetadata(location, partsByPath); - LOG.info(String.format( - "Loaded file and block metadata for %s partitions from %s paths: %s", - partsByPath.size(), locations.size(), getFullName())); + private void loadMetadataAndDiskIds(HashMap<Path, List<HdfsPartition>> partsByPath) { + LOG.info(String.format("Loading file and block metadata for %s paths: %s", + partsByPath.size(), getFullName())); + for (Path partDir: partsByPath.keySet()) { + loadBlockMetadata(partDir, partsByPath.get(partDir)); + } + LOG.info(String.format("Loaded file and block metadata for %s paths: %s", + partsByPath.size(), getFullName())); } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59d1aa6e/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java index 2841993..c237426 100644 --- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java @@ -443,32 +443,6 @@ public class FileSystemUtil { } /** - * Returns true if Path 'p' is a descendant of Path 'parent', false otherwise. - * This function relies on Path.equals() which requires paths to have the same - * schema and authority to compare equal. So both 'p' and 'parent' should either - * be qualified or unqualified paths for this function to behave as expected. - */ - public static boolean isDescendantPath(Path p, Path parent) { - if (p == null || parent == null) return false; - while (!p.isRoot() && p.depth() != parent.depth()) p = p.getParent(); - if (p.isRoot()) return false; - boolean result = p.equals(parent); - if (!result && LOG.isTraceEnabled()) { - // Add a message to the log if 'p' and 'parent' have inconsistent qualification. - URI pUri = p.toUri(); - URI parentUri = parent.toUri(); - boolean sameScheme = Objects.equal(pUri.getScheme(), parentUri.getScheme()); - boolean sameAuthority = - Objects.equal(pUri.getAuthority(), parentUri.getAuthority()); - if (!sameScheme || !sameAuthority) { - LOG.trace("Inconsistent schema or authority for paths: " + - p.toString() + " " + parent.toString()); - } - } - return result; - } - - /** * Returns the configuration. */ public static Configuration getConfiguration() {
