IMPALA-4789: Fix slow metadata loading due to inconsistent paths. The fix for IMPALA-4172/IMPALA-3653 introduced a performance regression for loading tables that have many partitions with: 1. Inconsistent HDFS path qualification or 2. A custom location (not under the table root dir)
For the first issue consider a table whose root path is at 'hdfs://localhost:8020/warehouse/tbl/'. A partition with an unqualified location '/warehouse/tbl/p=1' will not be recognized as being a descendant of the table root dir by FileSystemUtil.isDescendentPath() because of how Path.equals() behaves, even if 'hdfs://localhost:8020' is the default filesystem. Such partitions are incorrectly recognized as having a custom location and are loaded separately. There were two performance issues: 1. The code for loading the files/blocks of partitions with truly custom locations was inefficient with an O(N^2) loop for determining the target partitions. 2. Each partition that is incorrectly identified as having a custom path (e.g. due to inconsistent qualification), is going to have its files/blocks loaded twice. Once when the table root path is processed, and once when the 'custom' partition is processed. This patch fixes the detection of partitions with custom locations, and improves the speed of loading partitions with custom locations. Change-Id: I8c881b7cb155032b82fba0e29350ca31de388d55 Reviewed-on: http://gerrit.cloudera.org:8080/5743 Reviewed-by: Alex Behm <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7b8ffd35 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7b8ffd35 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7b8ffd35 Branch: refs/heads/master Commit: 7b8ffd35534c11ae3caa048229effc97613cd34f Parents: a0ec519 Author: Alex Behm <[email protected]> Authored: Thu Jan 19 00:22:47 2017 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Sat Jan 28 09:22:09 2017 +0000 ---------------------------------------------------------------------- .../org/apache/impala/catalog/HdfsTable.java | 58 ++++++++++++++------ .../apache/impala/common/FileSystemUtil.java | 19 ++++++- 2 files changed, 59 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b8ffd35/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index a6d0f47..795dae2 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -246,6 +246,8 @@ public class HdfsTable extends Table { * file under it recursively. * - For every valid data file, map it to a partition from 'partsByPath' (if one exists) * and enumerate all its blocks and their corresponding hosts and disk IDs. + * Requires that 'dirPath' and all paths in 'partsByPath' have consistent qualification + * (either fully qualified or unqualified), for isDescendantPath(). * TODO: Split this method into more logical methods for cleaner code. */ private void loadBlockMetadata(Path dirPath, @@ -257,15 +259,29 @@ public class HdfsTable extends Table { if (LOG.isTraceEnabled()) { LOG.trace("Loading block md for " + name_ + " directory " + dirPath.toString()); } - // Clear the state of partitions under dirPath since they are now updated based - // on the current snapshot of files in the directory. - for (Map.Entry<Path, List<HdfsPartition>> entry: partsByPath.entrySet()) { - Path partDir = entry.getKey(); - if (!FileSystemUtil.isDescendantPath(partDir, dirPath)) continue; - for (HdfsPartition partition: entry.getValue()) { + + // Clear the state of partitions under dirPath since they are going to be updated + // based on the current snapshot of files in the directory. + List<HdfsPartition> dirPathPartitions = partsByPath.get(dirPath); + if (dirPathPartitions != null) { + // The dirPath is a partition directory. This means the path is the root of an + // unpartitioned table, or the path of at least one partition. + for (HdfsPartition partition: dirPathPartitions) { partition.setFileDescriptors(new ArrayList<FileDescriptor>()); } + } else { + // The dirPath is not a partition directory. We expect it to be an ancestor of + // partition paths (e.g., the table root). Clear all partitions whose paths are + // a descendant of dirPath. + for (Map.Entry<Path, List<HdfsPartition>> entry: partsByPath.entrySet()) { + Path partDir = entry.getKey(); + if (!FileSystemUtil.isDescendantPath(partDir, dirPath)) continue; + for (HdfsPartition partition: entry.getValue()) { + partition.setFileDescriptors(new ArrayList<FileDescriptor>()); + } + } } + // For file systems that do not support BlockLocation API, we manually synthesize // block location metadata based on file formats. if (!FileSystemUtil.supportsStorageIds(fs)) { @@ -671,7 +687,8 @@ public class HdfsTable extends Table { // using createPartition() calls. A single partition path can correspond to multiple // partitions. HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap(); - Path tblLocation = getHdfsBaseDirPath(); + // Qualify to ensure isDescendantPath() works correctly. + Path tblLocation = FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath()); // List of directories that we scan for block locations. We optimize the block metadata // loading to reduce the number of RPCs to the NN by separately loading partitions // with default directory paths (under the base table directory) and non-default @@ -681,7 +698,7 @@ public class HdfsTable extends Table { // TODO: We can still do some advanced optimization by grouping all the partition // directories under the same ancestor path up the tree. List<Path> dirsToLoad = Lists.newArrayList(tblLocation); - FileSystem fs = tblLocation.getFileSystem(CONF); + if (msTbl.getPartitionKeysSize() == 0) { Preconditions.checkArgument(msPartitions == null || msPartitions.isEmpty()); // This table has no partition key, which means it has no declared partitions. @@ -692,6 +709,7 @@ public class HdfsTable extends Table { partsByPath.put(tblLocation, Lists.newArrayList(part)); if (isMarkedCached_) part.markCached(); addPartition(part); + FileSystem fs = tblLocation.getFileSystem(CONF); if (fs.exists(tblLocation)) { accessLevel_ = getAvailableAccessLevel(fs, tblLocation); } @@ -714,13 +732,17 @@ public class HdfsTable extends Table { // WRITE_ONLY the table's access level should be NONE. accessLevel_ = TAccessLevel.READ_ONLY; } - Path partDir = new Path(msPartition.getSd().getLocation()); + + // Qualify to ensure isDescendantPath() works correctly. + Path partDir = FileSystemUtil.createFullyQualifiedPath( + new Path(msPartition.getSd().getLocation())); List<HdfsPartition> parts = partsByPath.get(partDir); if (parts == null) { partsByPath.put(partDir, Lists.newArrayList(partition)); } else { parts.add(partition); } + if (!dirsToLoad.contains(partDir) && !FileSystemUtil.isDescendantPath(partDir, tblLocation)) { // This partition has a custom filesystem location. Load its file/block @@ -734,10 +756,10 @@ public class HdfsTable extends Table { } private void loadMetadataAndDiskIds(HdfsPartition partition) throws CatalogException { - Path partDirPath = partition.getLocationPath(); - HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap(); - partsByPath.put(partDirPath, Lists.newArrayList(partition)); - loadMetadataAndDiskIds(Lists.newArrayList(partDirPath), partsByPath); + Path partDirPath = partition.getLocationPath(); + HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap(); + partsByPath.put(partDirPath, Lists.newArrayList(partition)); + loadMetadataAndDiskIds(Lists.newArrayList(partDirPath), partsByPath); } /** @@ -747,11 +769,13 @@ public class HdfsTable extends Table { */ private void loadMetadataAndDiskIds(List<Path> locations, HashMap<Path, List<HdfsPartition>> partsByPath) { - LOG.info(String.format("Loading file and block metadata for %s partitions: %s", - partsByPath.size(), getFullName())); + LOG.info(String.format( + "Loading file and block metadata for %s partitions from %s paths: %s", + partsByPath.size(), locations.size(), getFullName())); for (Path location: locations) { loadBlockMetadata(location, partsByPath); } - LOG.info(String.format("Loaded file and block metadata for %s partitions: %s", - partsByPath.size(), getFullName())); + LOG.info(String.format( + "Loaded file and block metadata for %s partitions from %s paths: %s", + partsByPath.size(), locations.size(), getFullName())); } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b8ffd35/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java index 4767837..f8c50b4 100644 --- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.impala.catalog.HdfsCompression; import org.apache.log4j.Logger; +import com.google.common.base.Objects; import com.google.common.base.Preconditions; /** @@ -424,12 +425,28 @@ public class FileSystemUtil { /** * Returns true if Path 'p' is a descendant of Path 'parent', false otherwise. + * This function relies on Path.equals() which requires paths to have the same + * schema and authority to compare equal. So both 'p' and 'parent' should either + * be qualified or unqualified paths for this function to behave as expected. */ public static boolean isDescendantPath(Path p, Path parent) { if (p == null || parent == null) return false; while (!p.isRoot() && p.depth() != parent.depth()) p = p.getParent(); if (p.isRoot()) return false; - return p.equals(parent); + boolean result = p.equals(parent); + if (!result && LOG.isTraceEnabled()) { + // Add a message to the log if 'p' and 'parent' have inconsistent qualification. + URI pUri = p.toUri(); + URI parentUri = parent.toUri(); + boolean sameScheme = Objects.equal(pUri.getScheme(), parentUri.getScheme()); + boolean sameAuthority = + Objects.equal(pUri.getAuthority(), parentUri.getAuthority()); + if (!sameScheme || !sameAuthority) { + LOG.trace("Inconsistent schema or authority for paths: " + + p.toString() + " " + parent.toString()); + } + } + return result; } /**
