Repository: incubator-impala
Updated Branches:
  refs/heads/master 5f2a44069 -> 8149d0e57


IMPALA-4847: Simplify HdfsTable block metadata loading code

This commit is a part of ground work for the upcoming multi
threaded block metadata loading patches.

The patch for IMPALA-4172 introduced code that groups the block
location requests for partition directories that reside under the
table directory into a single call to the NN in order to reduce the
number of RPCs. However, it turns out that the hdfs client library
internally makes one RPC per directory thus defeating the
purpose of optimization. Also, this made the code unnecessarily
complex since we need to map each file to its corresponding partition
at runtime.

This patch undos that optimization and makes HDFS calls per partition,
which is much easier to understand. This also helps the upcoming patch
on multi threaded block metadata loading since there is much less shared
state when loading multiple partitions in parallel.

Change-Id: I963d647bd2ba11e3843c6ef2ac6be113c74280bf
Reviewed-on: http://gerrit.cloudera.org:8080/7652
Reviewed-by: Bharath Vissapragada <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/59d1aa6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/59d1aa6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/59d1aa6e

Branch: refs/heads/master
Commit: 59d1aa6ea7788398574263c4a716b41c8b250ba6
Parents: 5f2a440
Author: Bharath Vissapragada <[email protected]>
Authored: Mon Jul 17 11:55:51 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Mon Aug 28 20:03:06 2017 +0000

----------------------------------------------------------------------
 .../org/apache/impala/catalog/HdfsTable.java    | 214 ++++++-------------
 .../apache/impala/common/FileSystemUtil.java    |  26 ---
 2 files changed, 60 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59d1aa6e/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 6aca8dc..45ebfc9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -249,79 +249,56 @@ public class HdfsTable extends Table {
   }
 
   /**
-   * Drops and re-loads the block metadata for all partitions in 'partsByPath' 
whose
-   * location is under the given 'dirPath'. It involves the following steps:
+   * Drops and re-loads the block metadata of all the partitions in 
'partitions' that
+   * correspond to the path 'partDir'. 'partDir' may belong to any file system 
that
+   * implements the hadoop's FileSystem interface (like HDFS, S3, ADLS etc.). 
It involves
+   * the following steps:
    * - Clear the current block metadata of the partitions.
-   * - Call FileSystem.listStatus() on 'dirPath' to fetch the BlockLocations 
for each
-   *   file under it recursively.
-   * - For every valid data file, map it to a partition from 'partsByPath' (if 
one exists)
-   *   and enumerate all its blocks and their corresponding hosts and disk IDs.
-   * Requires that 'dirPath' and all paths in 'partsByPath' have consistent 
qualification
-   * (either fully qualified or unqualified), for isDescendantPath().
-   * TODO: Split this method into more logical methods for cleaner code.
+   * - Call FileSystem.listFiles() on 'partDir' to fetch the FileStatus and 
BlockLocations
+   *   for each file under it.
+   * - For every valid data file, enumerate all its blocks and their 
corresponding hosts
+   *   and disk IDs if the underlying file system supports the block locations 
API
+   *   (for ex: HDFS). For other file systems (like S3) we synthesize the 
block metadata
+   *   manually by splitting the file ranges into fixed size blocks.
+   * For filesystems that don't support BlockLocation API, synthesize file 
blocks
+   * by manually splitting the file range into fixed-size blocks.  That way, 
scan
+   * ranges can be derived from file blocks as usual.  All synthesized blocks 
are given
+   * an invalid network address so that the scheduler will treat them as 
remote.
    */
-  private void loadBlockMetadata(Path dirPath,
-      HashMap<Path, List<HdfsPartition>> partsByPath) {
+  private void loadBlockMetadata(Path partDir, List<HdfsPartition> partitions) 
{
     try {
       // No need to load blocks for empty partitions list.
-      if (partsByPath.size() == 0) return;
+      if (partitions == null || partitions.isEmpty()) return;
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Loading block md for " + name_ + " directory " + 
dirPath.toString());
+        LOG.trace("Loading block md for " + name_ + " directory " + 
partDir.toString());
       }
 
       // Clear the state of partitions under dirPath since they are going to 
be updated
       // based on the current snapshot of files in the directory.
-      List<HdfsPartition> dirPathPartitions = partsByPath.get(dirPath);
-      if (dirPathPartitions != null) {
-        // The dirPath is a partition directory. This means the path is the 
root of an
-        // unpartitioned table, or the path of at least one partition.
-        for (HdfsPartition partition: dirPathPartitions) {
-          partition.setFileDescriptors(new ArrayList<FileDescriptor>());
-        }
-      } else {
-        // The dirPath is not a partition directory. We expect it to be an 
ancestor of
-        // partition paths (e.g., the table root). Clear all partitions whose 
paths are
-        // a descendant of dirPath.
-        for (Map.Entry<Path, List<HdfsPartition>> entry: 
partsByPath.entrySet()) {
-          Path partDir = entry.getKey();
-          if (!FileSystemUtil.isDescendantPath(partDir, dirPath)) continue;
-          for (HdfsPartition partition: entry.getValue()) {
-            partition.setFileDescriptors(new ArrayList<FileDescriptor>());
-          }
-        }
-      }
-
-      FileSystem fs = dirPath.getFileSystem(CONF);
-      // For file systems that do not support BlockLocation API, we manually 
synthesize
-      // block location metadata based on file formats.
-      if (!FileSystemUtil.supportsStorageIds(fs)) {
-        synthesizeBlockMetadata(fs, dirPath, partsByPath);
-        return;
+      for (HdfsPartition partition: partitions) {
+        partition.setFileDescriptors(new ArrayList<FileDescriptor>());
       }
 
+      FileSystem fs = partDir.getFileSystem(CONF);
+      boolean synthesizeBlockMd = !FileSystemUtil.supportsStorageIds(fs);
       RemoteIterator<LocatedFileStatus> fileStatusIter =
-          FileSystemUtil.listFiles(fs, dirPath, true);
+          FileSystemUtil.listFiles(fs, partDir, false);
       if (fileStatusIter == null) return;
       Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
       while (fileStatusIter.hasNext()) {
         LocatedFileStatus fileStatus = fileStatusIter.next();
         if (!FileSystemUtil.isValidDataFile(fileStatus)) continue;
-        // Find the partition that this file belongs (if any).
-        Path partPathDir = fileStatus.getPath().getParent();
-        Preconditions.checkNotNull(partPathDir);
-
-        List<HdfsPartition> partitions = partsByPath.get(partPathDir);
-        // Skip if this file does not belong to any known partition.
-        if (partitions == null) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("File " + fileStatus.getPath().toString() + " doesn't 
correspond " +
-                " to a known partition. Skipping metadata load for this 
file.");
-          }
-          continue;
+        FileDescriptor fd = null;
+        // Block locations are manually synthesized if the underlying fs does 
not support
+        // the block location API.
+        if (synthesizeBlockMd) {
+          fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus,
+              partitions.get(0).getFileFormat(), hostIndex_);
+        } else {
+          fd = FileDescriptor.create(fileStatus,
+              fileStatus.getBlockLocations(), fs, hostIndex_, 
numUnknownDiskIds);
         }
-
-        FileDescriptor fd = FileDescriptor.create(fileStatus,
-            fileStatus.getBlockLocations(), fs, hostIndex_, numUnknownDiskIds);
+        Preconditions.checkNotNull(fd);
         // Update the partitions' metadata that this file belongs to.
         for (HdfsPartition partition: partitions) {
           partition.getFileDescriptors().add(fd);
@@ -337,59 +314,7 @@ public class HdfsTable extends Table {
       }
     } catch (IOException e) {
       throw new RuntimeException("Error loading block metadata for directory "
-          + dirPath.toString() + ": " + e.getMessage(), e);
-    }
-  }
-
-  /**
-   * Synthesize the block metadata for a given HdfsPartition object. Should 
only
-   * be called for FileSystems that do not support storage IDs.
-   */
-  private void synthesizeBlockMetadata(FileSystem fs, HdfsPartition partition)
-      throws IOException {
-    Preconditions.checkState(!FileSystemUtil.supportsStorageIds(fs));
-    HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
-    Path partitionPath = partition.getLocationPath();
-    partition.setFileDescriptors(new ArrayList<FileDescriptor>());
-    partsByPath.put(partitionPath, Lists.newArrayList(partition));
-    synthesizeBlockMetadata(fs, partitionPath, partsByPath);
-  }
-
-  /**
-   * For filesystems that don't support BlockLocation API, synthesize file 
blocks
-   * by manually splitting the file range into fixed-size blocks.  That way, 
scan
-   * ranges can be derived from file blocks as usual.  All synthesized blocks 
are given
-   * an invalid network address so that the scheduler will treat them as 
remote.
-   */
-  private void synthesizeBlockMetadata(FileSystem fs, Path dirPath, 
HashMap<Path,
-      List<HdfsPartition>> partsByPath) throws IOException {
-    RemoteIterator<LocatedFileStatus> fileStatusIter =
-        FileSystemUtil.listFiles(fs, dirPath, true);
-    if (fileStatusIter == null) return;
-    while (fileStatusIter.hasNext()) {
-      LocatedFileStatus fileStatus = fileStatusIter.next();
-      if (!FileSystemUtil.isValidDataFile(fileStatus)) continue;
-      Path partPathDir = fileStatus.getPath().getParent();
-      Preconditions.checkNotNull(partPathDir);
-      List<HdfsPartition> partitions = partsByPath.get(partPathDir);
-      // Skip if this file does not belong to any known partition.
-      if (partitions == null) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("File " + fileStatus.getPath().toString() + " doesn't 
correspond " +
-              " to a known partition. Skipping metadata load for this file.");
-        }
-        continue;
-      }
-
-      Preconditions.checkState(partitions.size() > 0);
-      // For the purpose of synthesizing block metadata, we assume that all 
partitions
-      // with the same location have the same file format.
-      FileDescriptor fd = 
FileDescriptor.createWithSynthesizedBlockMd(fileStatus,
-          partitions.get(0).getFileFormat(), hostIndex_);
-      // Update the partitions' metadata that this file belongs to.
-      for (HdfsPartition partition: partitions) {
-        partition.getFileDescriptors().add(fd);
-      }
+          + partDir.toString() + ": " + e.getMessage(), e);
     }
   }
 
@@ -610,8 +535,7 @@ public class HdfsTable extends Table {
    * Create HdfsPartition objects corresponding to 'msPartitions' and add them 
to this
    * table's partition list. Any partition metadata will be reset and loaded 
from
    * scratch. For each partition created, we load the block metadata for each 
data file
-   * under it. We optimize the block metadata loading by grouping together the 
name node
-   * requests for all the partitions under the table base directory into a 
single RPC.
+   * under it.
    *
    * If there are no partitions in the Hive metadata, a single partition is 
added with no
    * partition keys.
@@ -626,17 +550,6 @@ public class HdfsTable extends Table {
     // using createPartition() calls. A single partition path can correspond 
to multiple
     // partitions.
     HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
-    // Qualify to ensure isDescendantPath() works correctly.
-    Path tblLocation = 
FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath());
-    // List of directories that we scan for block locations. We optimize the 
block metadata
-    // loading to reduce the number of RPCs to the NN by separately loading 
partitions
-    // with default directory paths (under the base table directory) and 
non-default
-    // directory paths. For the former we issue a single RPC to the NN to load 
all the
-    // blocks from hdfsBaseDir_ and for the latter we load each of the 
partition directory
-    // separately.
-    // TODO: We can still do some advanced optimization by grouping all the 
partition
-    // directories under the same ancestor path up the tree.
-    Set<Path> dirsToLoad = Sets.newHashSet(tblLocation);
 
     if (msTbl.getPartitionKeysSize() == 0) {
       Preconditions.checkArgument(msPartitions == null || 
msPartitions.isEmpty());
@@ -644,6 +557,7 @@ public class HdfsTable extends Table {
       // We model partitions slightly differently to Hive - every file must 
exist in a
       // partition, so add a single partition with no keys which will get all 
the
       // files in the table's root directory.
+      Path tblLocation = 
FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath());
       HdfsPartition part = createPartition(msTbl.getSd(), null);
       partsByPath.put(tblLocation, Lists.newArrayList(part));
       if (isMarkedCached_) part.markCached();
@@ -670,7 +584,6 @@ public class HdfsTable extends Table {
           accessLevel_ = TAccessLevel.READ_ONLY;
         }
 
-        // Qualify to ensure isDescendantPath() works correctly.
         Path partDir = FileSystemUtil.createFullyQualifiedPath(
             new Path(msPartition.getSd().getLocation()));
         List<HdfsPartition> parts = partsByPath.get(partDir);
@@ -679,17 +592,10 @@ public class HdfsTable extends Table {
         } else {
           parts.add(partition);
         }
-
-        if (!dirsToLoad.contains(partDir) &&
-            !FileSystemUtil.isDescendantPath(partDir, tblLocation)) {
-          // This partition has a custom filesystem location. Load its 
file/block
-          // metadata separately by adding it to the list of dirs to load.
-          dirsToLoad.add(partDir);
-        }
       }
     }
 
-    loadMetadataAndDiskIds(dirsToLoad, partsByPath);
+    loadMetadataAndDiskIds(partsByPath);
   }
 
   /**
@@ -707,10 +613,7 @@ public class HdfsTable extends Table {
     Preconditions.checkNotNull(partDir);
     try {
       FileSystem fs = partDir.getFileSystem(CONF);
-      if (!FileSystemUtil.supportsStorageIds(fs)) {
-        synthesizeBlockMetadata(fs, partition);
-        return;
-      }
+      boolean synthesizeBlockMd = !FileSystemUtil.supportsStorageIds(fs);
       // Index the partition file descriptors by their file names for O(1) 
look ups.
       ImmutableMap<String, FileDescriptor> fileDescsByName = Maps.uniqueIndex(
           partition.getFileDescriptors(), new Function<FileDescriptor, 
String>() {
@@ -730,11 +633,17 @@ public class HdfsTable extends Table {
           if (fd == null || partition.isMarkedCached() ||
               fd.getFileLength() != fileStatus.getLen() ||
               fd.getModificationTime() != fileStatus.getModificationTime()) {
-            BlockLocation[] locations =
-                fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
-            fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_,
-                new Reference<Long>(Long.valueOf(0)));
+            if (synthesizeBlockMd) {
+              fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus,
+                  partition.getFileFormat(), hostIndex_);
+            } else {
+              BlockLocation[] locations =
+                  fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+              fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_,
+                  new Reference<Long>(Long.valueOf(0)));
+            }
           }
+          Preconditions.checkNotNull(fd);
           newFileDescs.add(fd);
         }
       }
@@ -751,27 +660,24 @@ public class HdfsTable extends Table {
    * use refreshFileMetadata(HdfsPartition).
    */
   private void loadFileMetadataFromScratch(HdfsPartition partition) {
-    Path partitionDirPath = partition.getLocationPath();
-    Set<Path> dirsToLoad = Sets.newHashSet(partitionDirPath);
     HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
-    partsByPath.put(partitionDirPath, Lists.newArrayList(partition));
-    loadMetadataAndDiskIds(dirsToLoad, partsByPath);
+    partsByPath.put(partition.getLocationPath(), 
Lists.newArrayList(partition));
+    loadMetadataAndDiskIds(partsByPath);
   }
 
   /**
-   * Helper method to load the block locations from each directory in 
'locations'
-   * and filtering only the paths from 'partsByPath'. Also loads the disk IDs
-   * corresponding to these block locations.
+   * Helper method to load the block locations for each partition directory in
+   * partsByPath. 'partsByPath' maps each partition directory to the 
corresponding
+   * HdfsPartition objects.
    */
-  private void loadMetadataAndDiskIds(Set<Path> locations,
-      HashMap<Path, List<HdfsPartition>> partsByPath) {
-    LOG.info(String.format(
-        "Loading file and block metadata for %s partitions from %s paths: %s",
-        partsByPath.size(), locations.size(), getFullName()));
-    for (Path location: locations) loadBlockMetadata(location, partsByPath);
-    LOG.info(String.format(
-        "Loaded file and block metadata for %s partitions from %s paths: %s",
-        partsByPath.size(), locations.size(), getFullName()));
+  private void loadMetadataAndDiskIds(HashMap<Path, List<HdfsPartition>> 
partsByPath) {
+    LOG.info(String.format("Loading file and block metadata for %s paths: %s",
+        partsByPath.size(), getFullName()));
+    for (Path partDir: partsByPath.keySet()) {
+      loadBlockMetadata(partDir, partsByPath.get(partDir));
+    }
+    LOG.info(String.format("Loaded file and block metadata for %s paths: %s",
+        partsByPath.size(), getFullName()));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59d1aa6e/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java 
b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 2841993..c237426 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -443,32 +443,6 @@ public class FileSystemUtil {
   }
 
   /**
-   * Returns true if Path 'p' is a descendant of Path 'parent', false 
otherwise.
-   * This function relies on Path.equals() which requires paths to have the 
same
-   * schema and authority to compare equal. So both 'p' and 'parent' should 
either
-   * be qualified or unqualified paths for this function to behave as expected.
-   */
-  public static boolean isDescendantPath(Path p, Path parent) {
-    if (p == null || parent == null) return false;
-    while (!p.isRoot() && p.depth() != parent.depth()) p = p.getParent();
-    if (p.isRoot()) return false;
-    boolean result = p.equals(parent);
-    if (!result && LOG.isTraceEnabled()) {
-      // Add a message to the log if 'p' and 'parent' have inconsistent 
qualification.
-      URI pUri = p.toUri();
-      URI parentUri = parent.toUri();
-      boolean sameScheme = Objects.equal(pUri.getScheme(), 
parentUri.getScheme());
-      boolean sameAuthority =
-          Objects.equal(pUri.getAuthority(), parentUri.getAuthority());
-      if (!sameScheme || !sameAuthority) {
-        LOG.trace("Inconsistent schema or authority for paths: " +
-            p.toString() + " " + parent.toString());
-      }
-    }
-    return result;
-  }
-
-  /**
    * Returns the configuration.
    */
   public static Configuration getConfiguration() {

Reply via email to