IMPALA-4789: Fix slow metadata loading due to inconsistent paths.

The fix for IMPALA-4172/IMPALA-3653 introduced a performance
regression for loading tables that have many partitions with:
1. Inconsistent HDFS path qualification or
2. A custom location (not under the table root dir)

For the first issue consider a table whose root path is at
'hdfs://localhost:8020/warehouse/tbl/'.
A partition with an unqualified location '/warehouse/tbl/p=1'
will not be recognized as being a descendant of the table root
dir by FileSystemUtil.isDescendentPath() because of how
Path.equals() behaves, even if 'hdfs://localhost:8020' is the
default filesystem. Such partitions are incorrectly recognized
as having a custom location and are loaded separately.

There were two performance issues:
1. The code for loading the files/blocks of partitions with
   truly custom locations was inefficient with an O(N^2)
   loop for determining the target partitions.
2. Each partition that is incorrectly identified as having
   a custom path (e.g. due to inconsistent qualification),
   is going to have its files/blocks loaded twice. Once
   when the table root path is processed, and once when the
   'custom' partition is processed.

This patch fixes the detection of partitions with custom
locations, and improves the speed of loading partitions
with custom locations.

Change-Id: I8c881b7cb155032b82fba0e29350ca31de388d55
Reviewed-on: http://gerrit.cloudera.org:8080/5743
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7b8ffd35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7b8ffd35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7b8ffd35

Branch: refs/heads/master
Commit: 7b8ffd35534c11ae3caa048229effc97613cd34f
Parents: a0ec519
Author: Alex Behm <[email protected]>
Authored: Thu Jan 19 00:22:47 2017 -0800
Committer: Impala Public Jenkins <[email protected]>
Committed: Sat Jan 28 09:22:09 2017 +0000

----------------------------------------------------------------------
 .../org/apache/impala/catalog/HdfsTable.java    | 58 ++++++++++++++------
 .../apache/impala/common/FileSystemUtil.java    | 19 ++++++-
 2 files changed, 59 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b8ffd35/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index a6d0f47..795dae2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -246,6 +246,8 @@ public class HdfsTable extends Table {
    *   file under it recursively.
    * - For every valid data file, map it to a partition from 'partsByPath' (if 
one exists)
    *   and enumerate all its blocks and their corresponding hosts and disk IDs.
+   * Requires that 'dirPath' and all paths in 'partsByPath' have consistent 
qualification
+   * (either fully qualified or unqualified), for isDescendantPath().
    * TODO: Split this method into more logical methods for cleaner code.
    */
   private void loadBlockMetadata(Path dirPath,
@@ -257,15 +259,29 @@ public class HdfsTable extends Table {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Loading block md for " + name_ + " directory " + 
dirPath.toString());
       }
-      // Clear the state of partitions under dirPath since they are now 
updated based
-      // on the current snapshot of files in the directory.
-      for (Map.Entry<Path, List<HdfsPartition>> entry: partsByPath.entrySet()) 
{
-        Path partDir = entry.getKey();
-        if (!FileSystemUtil.isDescendantPath(partDir, dirPath)) continue;
-        for (HdfsPartition partition: entry.getValue()) {
+
+      // Clear the state of partitions under dirPath since they are going to 
be updated
+      // based on the current snapshot of files in the directory.
+      List<HdfsPartition> dirPathPartitions = partsByPath.get(dirPath);
+      if (dirPathPartitions != null) {
+        // The dirPath is a partition directory. This means the path is the 
root of an
+        // unpartitioned table, or the path of at least one partition.
+        for (HdfsPartition partition: dirPathPartitions) {
           partition.setFileDescriptors(new ArrayList<FileDescriptor>());
         }
+      } else {
+        // The dirPath is not a partition directory. We expect it to be an 
ancestor of
+        // partition paths (e.g., the table root). Clear all partitions whose 
paths are
+        // a descendant of dirPath.
+        for (Map.Entry<Path, List<HdfsPartition>> entry: 
partsByPath.entrySet()) {
+          Path partDir = entry.getKey();
+          if (!FileSystemUtil.isDescendantPath(partDir, dirPath)) continue;
+          for (HdfsPartition partition: entry.getValue()) {
+            partition.setFileDescriptors(new ArrayList<FileDescriptor>());
+          }
+        }
       }
+
       // For file systems that do not support BlockLocation API, we manually 
synthesize
       // block location metadata based on file formats.
       if (!FileSystemUtil.supportsStorageIds(fs)) {
@@ -671,7 +687,8 @@ public class HdfsTable extends Table {
     // using createPartition() calls. A single partition path can correspond 
to multiple
     // partitions.
     HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
-    Path tblLocation = getHdfsBaseDirPath();
+    // Qualify to ensure isDescendantPath() works correctly.
+    Path tblLocation = 
FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath());
     // List of directories that we scan for block locations. We optimize the 
block metadata
     // loading to reduce the number of RPCs to the NN by separately loading 
partitions
     // with default directory paths (under the base table directory) and 
non-default
@@ -681,7 +698,7 @@ public class HdfsTable extends Table {
     // TODO: We can still do some advanced optimization by grouping all the 
partition
     // directories under the same ancestor path up the tree.
     List<Path> dirsToLoad = Lists.newArrayList(tblLocation);
-    FileSystem fs = tblLocation.getFileSystem(CONF);
+
     if (msTbl.getPartitionKeysSize() == 0) {
       Preconditions.checkArgument(msPartitions == null || 
msPartitions.isEmpty());
       // This table has no partition key, which means it has no declared 
partitions.
@@ -692,6 +709,7 @@ public class HdfsTable extends Table {
       partsByPath.put(tblLocation, Lists.newArrayList(part));
       if (isMarkedCached_) part.markCached();
       addPartition(part);
+      FileSystem fs = tblLocation.getFileSystem(CONF);
       if (fs.exists(tblLocation)) {
         accessLevel_ = getAvailableAccessLevel(fs, tblLocation);
       }
@@ -714,13 +732,17 @@ public class HdfsTable extends Table {
           // WRITE_ONLY the table's access level should be NONE.
           accessLevel_ = TAccessLevel.READ_ONLY;
         }
-        Path partDir = new Path(msPartition.getSd().getLocation());
+
+        // Qualify to ensure isDescendantPath() works correctly.
+        Path partDir = FileSystemUtil.createFullyQualifiedPath(
+            new Path(msPartition.getSd().getLocation()));
         List<HdfsPartition> parts = partsByPath.get(partDir);
         if (parts == null) {
           partsByPath.put(partDir, Lists.newArrayList(partition));
         } else {
           parts.add(partition);
         }
+
         if (!dirsToLoad.contains(partDir) &&
             !FileSystemUtil.isDescendantPath(partDir, tblLocation)) {
           // This partition has a custom filesystem location. Load its 
file/block
@@ -734,10 +756,10 @@ public class HdfsTable extends Table {
   }
 
   private void loadMetadataAndDiskIds(HdfsPartition partition) throws 
CatalogException {
-      Path partDirPath = partition.getLocationPath();
-      HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
-      partsByPath.put(partDirPath, Lists.newArrayList(partition));
-      loadMetadataAndDiskIds(Lists.newArrayList(partDirPath), partsByPath);
+    Path partDirPath = partition.getLocationPath();
+    HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
+    partsByPath.put(partDirPath, Lists.newArrayList(partition));
+    loadMetadataAndDiskIds(Lists.newArrayList(partDirPath), partsByPath);
   }
 
   /**
@@ -747,11 +769,13 @@ public class HdfsTable extends Table {
    */
   private void loadMetadataAndDiskIds(List<Path> locations,
       HashMap<Path, List<HdfsPartition>> partsByPath) {
-    LOG.info(String.format("Loading file and block metadata for %s partitions: 
%s",
-        partsByPath.size(), getFullName()));
+    LOG.info(String.format(
+        "Loading file and block metadata for %s partitions from %s paths: %s",
+        partsByPath.size(), locations.size(), getFullName()));
     for (Path location: locations) { loadBlockMetadata(location, partsByPath); 
}
-    LOG.info(String.format("Loaded file and block metadata for %s partitions: 
%s",
-        partsByPath.size(), getFullName()));
+    LOG.info(String.format(
+        "Loaded file and block metadata for %s partitions from %s paths: %s",
+        partsByPath.size(), locations.size(), getFullName()));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b8ffd35/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java 
b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 4767837..f8c50b4 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 
 /**
@@ -424,12 +425,28 @@ public class FileSystemUtil {
 
   /**
    * Returns true if Path 'p' is a descendant of Path 'parent', false 
otherwise.
+   * This function relies on Path.equals() which requires paths to have the 
same
+   * schema and authority to compare equal. So both 'p' and 'parent' should 
either
+   * be qualified or unqualified paths for this function to behave as expected.
    */
   public static boolean isDescendantPath(Path p, Path parent) {
     if (p == null || parent == null) return false;
     while (!p.isRoot() && p.depth() != parent.depth()) p = p.getParent();
     if (p.isRoot()) return false;
-    return p.equals(parent);
+    boolean result = p.equals(parent);
+    if (!result && LOG.isTraceEnabled()) {
+      // Add a message to the log if 'p' and 'parent' have inconsistent 
qualification.
+      URI pUri = p.toUri();
+      URI parentUri = parent.toUri();
+      boolean sameScheme = Objects.equal(pUri.getScheme(), 
parentUri.getScheme());
+      boolean sameAuthority =
+          Objects.equal(pUri.getAuthority(), parentUri.getAuthority());
+      if (!sameScheme || !sameAuthority) {
+        LOG.trace("Inconsistent schema or authority for paths: " +
+            p.toString() + " " + parent.toString());
+      }
+    }
+    return result;
   }
 
   /**

Reply via email to