IMPALA-5429: Multi threaded block metadata loading

Implements multi threaded block metadata loading on the Catalog
server where we fetch block metadata for multiple partitions of a
single table in parallel. Number of threads to load the metadata is
controlled by the following two parameters (set on the Catalog server
startup and applies for each table load)

-max_hdfs_partitions_parallel_load(default=5)
-max_nonhdfs_partitions_parallel_load(default=20)

We use different thread pool sizes for HDFS and non-HDFS tables since
non-HDFS supports much higher throughput of RPC calls for listStatus
/listFiles. Based on our experiments, S3 showed a linear speed up
(up to ~113x) with increasing number of loading threads where as the
HDFS throughput was limited to ~5x in un-secure clusters and up to
~3.7x in secure clusters. We narrowed it down to scalability
bottlenecks in HDFS RPC implementation (HADOOP-14558) on both the
server and the client side.

One thing to note here is that the thread pool based metadata fetching
is implemented only for loading HDFS block metadata and not for loading
HMS partition information. Our experiments showed that while loading
large partitioned tables, ~90% of the time is spent in connecting to NN
and loading the HDFS block information and optimizing the rest ~10% makes
the code unnecessarily complex without much gain.

Additional notes:

- The multithreading approach is implemented for
  * INVALIDATE (loading from scratch),
  * REFRESH (reusing existing md) code paths,
  * ALTER TABLE ADD/RECOVER PARTITIONS.

- This patch makes the implementation of ListMap thread-safe since
we use that data structure as a shared state between multiple partition
metadata loding threads.

Testing and Results:

- This patch doesn't add any new tests since there is enough test
coverage already. Passed core/exhaustive runs with HDFS/S3.

- We noticed up to ~113x speedup on S3 tables(thread_pool_size=160)
and up to ~5x speed up in un-secure HDFS clusters and ~3.7x in secure
HDFS clusters.

- Synthesized the following two large tables on HDFS and S3 and noticed
significant reduction in my test DDL queries.

  (1) 100K partitions + 1 million files
  (2) 80 partitions + 250K files

 100K-PARTITIONS-1M-FILES-CUSTOM-11-REFRESH-PARTITION     I -16.4%
 100K-PARTITIONS-1M-FILES-CUSTOM-08-ADD-PARTITION         I -17.25%
 80-PARTITIONS-250K-FILES-11-REFRESH-PARTITION            I -23.57%
 80-PARTITIONS-250K-FILES-S3-08-ADD-PARTITION             I -23.87%
 80-PARTITIONS-250K-FILES-09-INVALIDATE                   I -24.88%
 80-PARTITIONS-250K-FILES-03-RECOVER                      I -35.90%
 80-PARTITIONS-250K-FILES-07-REFRESH                      I -43.03%
 100K-PARTITIONS-1M-FILES-CUSTOM-12-QUERY-PARTITIONS      I -43.93%
 100K-PARTITIONS-1M-FILES-CUSTOM-05-QUERY-AFTER-INV       I -46.59%
 80-PARTITIONS-250K-FILES-10-REFRESH-AFTER-ADD-PARTITION  I -48.71%
 100K-PARTITIONS-1M-FILES-CUSTOM-07-REFRESH               I -49.02%
 80-PARTITIONS-250K-FILES-05-QUERY-AFTER-INV              I -49.05%
 100K-PARTITIONS-1M-FILES-CUSTOM-10-REFRESH-AFTER-ADD-PARTI -51.87%
 80-PARTITIONS-250K-FILES-S3-03-RECOVER                   I -67.17%
 80-PARTITIONS-250K-FILES-S3-05-QUERY-AFTER-INV           I -76.45%
 80-PARTITIONS-250K-FILES-S3-07-REFRESH                   I -87.04%
 80-PARTITIONS-250K-FILES-S3-10-REFRESH-AFTER-ADD-PART    I -88.57%

Change-Id: I07eaa7151dfc4d56da8db8c2654bd65d8f808481
Reviewed-on: http://gerrit.cloudera.org:8080/8235
Reviewed-by: Bharath Vissapragada <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f2cd5bd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f2cd5bd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f2cd5bd5

Branch: refs/heads/master
Commit: f2cd5bd516adb4bcb41d28ebed3894edd15fcef6
Parents: bb73a29
Author: Bharath Vissapragada <[email protected]>
Authored: Mon Jul 31 13:10:04 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Oct 26 21:56:25 2017 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog.cc                       |   7 +
 be/src/util/backend-gflag-util.cc               |   5 +
 common/thrift/BackendGflags.thrift              |   4 +
 .../HdfsPartitionLocationCompressor.java        |   2 +-
 .../org/apache/impala/catalog/HdfsTable.java    | 602 ++++++++++++-------
 .../apache/impala/service/BackendConfig.java    |   7 +
 .../impala/service/CatalogOpExecutor.java       |  29 +-
 .../org/apache/impala/service/JniCatalog.java   |   2 +
 .../java/org/apache/impala/util/ListMap.java    |  34 +-
 9 files changed, 456 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2cd5bd5/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index fd5f940..b6dd86a 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -35,6 +35,13 @@ DEFINE_bool(load_catalog_in_background, false,
 DEFINE_int32(num_metadata_loading_threads, 16,
     "(Advanced) The number of metadata loading threads (degree of parallelism) 
to use "
     "when loading catalog metadata.");
+DEFINE_int32(max_hdfs_partitions_parallel_load, 5,
+    "(Advanced) Number of threads used to load block metadata for HDFS based 
partitioned "
+    "tables. Due to HDFS architectural limitations, it is unlikely to get a 
linear "
+    "speed up beyond 5 threads.");
+DEFINE_int32(max_nonhdfs_partitions_parallel_load, 20,
+    "(Advanced) Number of threads used to load block metadata for tables that 
do not "
+    "support the notion of blocks/storage IDs. Currently supported for 
S3/ADLS.");
 DEFINE_int32(initial_hms_cnxn_timeout_s, 120,
     "Number of seconds catalogd will wait to establish an initial connection 
to the HMS "
     "before exiting.");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2cd5bd5/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index f5d2e64..91095e1 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -30,6 +30,8 @@ DECLARE_bool(load_auth_to_local_rules);
 DECLARE_bool(enable_stats_extrapolation);
 DECLARE_int32(non_impala_java_vlog);
 DECLARE_int32(num_metadata_loading_threads);
+DECLARE_int32(max_hdfs_partitions_parallel_load);
+DECLARE_int32(max_nonhdfs_partitions_parallel_load);
 DECLARE_int32(initial_hms_cnxn_timeout_s);
 DECLARE_int32(kudu_operation_timeout_ms);
 DECLARE_int64(sentry_catalog_polling_frequency_s);
@@ -59,6 +61,9 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* 
cfg_bytes) {
   cfg.__set_kudu_master_hosts(FLAGS_kudu_master_hosts);
   cfg.__set_read_size(FLAGS_read_size);
   cfg.__set_num_metadata_loading_threads(FLAGS_num_metadata_loading_threads);
+  
cfg.__set_max_hdfs_partitions_parallel_load(FLAGS_max_hdfs_partitions_parallel_load);
+  cfg.__set_max_nonhdfs_partitions_parallel_load(
+      FLAGS_max_nonhdfs_partitions_parallel_load);
   cfg.__set_initial_hms_cnxn_timeout_s(FLAGS_initial_hms_cnxn_timeout_s);
   cfg.__set_sentry_config(FLAGS_sentry_config);
   // auth_to_local rules are read if --load_auth_to_local_rules is set to true

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2cd5bd5/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index 45c9133..baae9ba 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -58,4 +58,8 @@ struct TBackendGflags {
   18: required bool enable_stats_extrapolation
 
   19: required i64 sentry_catalog_polling_frequency_s
+
+  20: required i32 max_hdfs_partitions_parallel_load
+
+  21: required i32 max_nonhdfs_partitions_parallel_load
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2cd5bd5/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java
 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java
index ce53c2d..94b9e4d 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java
@@ -51,7 +51,7 @@ class HdfsPartitionLocationCompressor {
   // Construct an HdfsPartitionLocationCompressor with a pre-filled 
bidirectional map
   // (indexToPrefix_, prefixToIndex_).
   public HdfsPartitionLocationCompressor(
-      int numClusteringColumns, ArrayList<String> prefixes) {
+      int numClusteringColumns, List<String> prefixes) {
     numClusteringColumns_ = numClusteringColumns;
     prefixMap_.populate(prefixes);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2cd5bd5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 45ebfc9..ab1511b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -29,6 +29,12 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
@@ -113,6 +119,9 @@ public class HdfsTable extends Table {
   // Number of times to retry fetching the partitions from the HMS should an 
error occur.
   private final static int NUM_PARTITION_FETCH_RETRIES = 5;
 
+  // Maximum number of errors logged when loading partitioned tables.
+  private static final int MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG = 100;
+
   // Table property key for skip.header.line.count
   public static final String TBL_PROP_SKIP_HEADER_LINE_COUNT = 
"skip.header.line.count";
 
@@ -200,6 +209,75 @@ public class HdfsTable extends Table {
   // and its usage in getFileSystem suggests it should be.
   private static final Configuration CONF = new Configuration();
 
+  private static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD =
+      BackendConfig.INSTANCE.maxHdfsPartsParallelLoad();
+
+  private static final int MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD =
+      BackendConfig.INSTANCE.maxNonHdfsPartsParallelLoad();
+
+  // File/Block metadata loading stats for a single HDFS path.
+  private class FileMetadataLoadStats {
+    // Path corresponding to this metadata load request.
+    private Path hdfsPath;
+
+    // Number of files for which the metadata was loaded.
+    public int loadedFiles = 0;
+
+    // Number of hidden files excluded from file metadata loading. More 
details at
+    // isValidDataFile().
+    public int hiddenFiles = 0;
+
+    // Number of files skipped from file metadata loading because the files 
have not
+    // changed since the last load. More details at hasFileChanged().
+    public int skippedFiles = 0;
+
+    // Number of unknown disk IDs encountered while loading block
+    // metadata for this path.
+    public long unknownDiskIds = 0;
+
+    public FileMetadataLoadStats(Path path) { hdfsPath = path; }
+
+    public String debugString() {
+      Preconditions.checkNotNull(hdfsPath);
+      return String.format("Path: %s: Loaded files: %s Hidden files: %s " +
+          "Skipped files: %s Unknown diskIDs: %s", hdfsPath, loadedFiles, 
hiddenFiles,
+          skippedFiles, unknownDiskIds);
+    }
+  }
+
+  // A callable implementation of file metadata loading request for a given
+  // HDFS path.
+  public class FileMetadataLoadRequest
+      implements Callable<FileMetadataLoadStats> {
+    private final Path hdfsPath_;
+    // All the partitions mapped to the above path
+    private final List<HdfsPartition> partitionList_;
+    // If set to true, reloads the file metadata only when the files in this 
path
+    // have changed since last load (more details in hasFileChanged()).
+    private final boolean reuseFileMd_;
+
+    public FileMetadataLoadRequest(Path path, List<HdfsPartition> partitions,
+       boolean reuseFileMd) {
+      hdfsPath_ = path;
+      partitionList_ = partitions;
+      reuseFileMd_ = reuseFileMd;
+    }
+
+    @Override
+    public FileMetadataLoadStats call() throws IOException {
+      FileMetadataLoadStats loadingStats =
+          reuseFileMd_ ? refreshFileMetadata(hdfsPath_, partitionList_) :
+          resetAndLoadFileMetadata(hdfsPath_, partitionList_);
+      return loadingStats;
+    }
+
+    public String debugString() {
+      String loadType = reuseFileMd_? "Refreshed" : "Loaded";
+      return String.format("%s file metadata for path: %s", loadType,
+          hdfsPath_.toString());
+    }
+  }
+
   public HdfsTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
       Db db, String name, String owner) {
     super(msTbl, db, name, owner);
@@ -249,72 +327,168 @@ public class HdfsTable extends Table {
   }
 
   /**
-   * Drops and re-loads the block metadata of all the partitions in 
'partitions' that
-   * correspond to the path 'partDir'. 'partDir' may belong to any file system 
that
+   * Drops and re-loads the file metadata of all the partitions in 
'partitions' that
+   * map to the path 'partDir'. 'partDir' may belong to any file system that
    * implements the hadoop's FileSystem interface (like HDFS, S3, ADLS etc.). 
It involves
    * the following steps:
-   * - Clear the current block metadata of the partitions.
+   * - Clear the current file metadata of the partitions.
    * - Call FileSystem.listFiles() on 'partDir' to fetch the FileStatus and 
BlockLocations
    *   for each file under it.
    * - For every valid data file, enumerate all its blocks and their 
corresponding hosts
    *   and disk IDs if the underlying file system supports the block locations 
API
-   *   (for ex: HDFS). For other file systems (like S3) we synthesize the 
block metadata
+   *   (for ex: HDFS). For other file systems (like S3) we synthesize the file 
metadata
    *   manually by splitting the file ranges into fixed size blocks.
    * For filesystems that don't support BlockLocation API, synthesize file 
blocks
    * by manually splitting the file range into fixed-size blocks.  That way, 
scan
    * ranges can be derived from file blocks as usual.  All synthesized blocks 
are given
    * an invalid network address so that the scheduler will treat them as 
remote.
    */
-  private void loadBlockMetadata(Path partDir, List<HdfsPartition> partitions) 
{
-    try {
-      // No need to load blocks for empty partitions list.
-      if (partitions == null || partitions.isEmpty()) return;
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Loading block md for " + name_ + " directory " + 
partDir.toString());
-      }
+  private FileMetadataLoadStats resetAndLoadFileMetadata(
+      Path partDir, List<HdfsPartition> partitions) throws IOException {
+    FileMetadataLoadStats loadStats = new FileMetadataLoadStats(partDir);
+    // No need to load blocks for empty partitions list.
+    if (partitions == null || partitions.isEmpty()) return loadStats;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Loading block md for " + getFullName() + " path: " + 
partDir.toString());
+    }
 
-      // Clear the state of partitions under dirPath since they are going to 
be updated
-      // based on the current snapshot of files in the directory.
-      for (HdfsPartition partition: partitions) {
-        partition.setFileDescriptors(new ArrayList<FileDescriptor>());
+    FileSystem fs = partDir.getFileSystem(CONF);
+    boolean synthesizeFileMd = !FileSystemUtil.supportsStorageIds(fs);
+    RemoteIterator<LocatedFileStatus> fileStatusIter =
+        FileSystemUtil.listFiles(fs, partDir, false);
+    if (fileStatusIter == null) return loadStats;
+    Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
+    List<FileDescriptor> newFileDescs = Lists.newArrayList();
+    while (fileStatusIter.hasNext()) {
+      LocatedFileStatus fileStatus = fileStatusIter.next();
+      if (!FileSystemUtil.isValidDataFile(fileStatus)) {
+        ++loadStats.hiddenFiles;
+        continue;
+      }
+      FileDescriptor fd;
+      // Block locations are manually synthesized if the underlying fs does 
not support
+      // the block location API.
+      if (synthesizeFileMd) {
+        fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus,
+            partitions.get(0).getFileFormat(), hostIndex_);
+      } else {
+        fd = FileDescriptor.create(fileStatus,
+            fileStatus.getBlockLocations(), fs, hostIndex_, numUnknownDiskIds);
       }
+      newFileDescs.add(fd);
+      ++loadStats.loadedFiles;
+    }
+    for (HdfsPartition partition: partitions) 
partition.setFileDescriptors(newFileDescs);
+    loadStats.unknownDiskIds += numUnknownDiskIds.getRef();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Loaded file metadata for " + getFullName() + " " +
+          loadStats.debugString());
+    }
+    return loadStats;
+  }
+
+  /**
+   * Refreshes file metadata information for 'path'. This method is optimized 
for
+   * the case where the files in the partition have not changed dramatically. 
It first
+   * uses a listStatus() call on the partition directory to detect the 
modified files
+   * (look at hasFileChanged()) and fetches their block locations using the
+   * getFileBlockLocations() method. Our benchmarks suggest that the 
listStatus() call
+   * is much faster then the listFiles() (up to ~40x faster in some cases).
+   */
+  private FileMetadataLoadStats refreshFileMetadata(
+      Path partDir, List<HdfsPartition> partitions) throws IOException {
+    FileMetadataLoadStats loadStats = new FileMetadataLoadStats(partDir);
+    // No need to load blocks for empty partitions list.
+    if (partitions == null || partitions.isEmpty()) return loadStats;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Refreshing block md for " + getFullName() + " path: " +
+          partDir.toString());
+    }
 
-      FileSystem fs = partDir.getFileSystem(CONF);
-      boolean synthesizeBlockMd = !FileSystemUtil.supportsStorageIds(fs);
-      RemoteIterator<LocatedFileStatus> fileStatusIter =
-          FileSystemUtil.listFiles(fs, partDir, false);
-      if (fileStatusIter == null) return;
-      Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
-      while (fileStatusIter.hasNext()) {
-        LocatedFileStatus fileStatus = fileStatusIter.next();
-        if (!FileSystemUtil.isValidDataFile(fileStatus)) continue;
-        FileDescriptor fd = null;
-        // Block locations are manually synthesized if the underlying fs does 
not support
-        // the block location API.
-        if (synthesizeBlockMd) {
+    // Index the partition file descriptors by their file names for O(1) look 
ups.
+    // We just pick the first partition to generate the fileDescByName lookup 
table
+    // since all the partitions map to the same partDir.
+    ImmutableMap<String, FileDescriptor> fileDescsByName = Maps.uniqueIndex(
+          partitions.get(0).getFileDescriptors(), new Function<FileDescriptor, 
String>() {
+            @Override
+            public String apply(FileDescriptor desc) { return 
desc.getFileName(); }
+          });
+
+    FileSystem fs = partDir.getFileSystem(CONF);
+    FileStatus[] fileStatuses = FileSystemUtil.listStatus(fs, partDir);
+    if (fileStatuses == null) return loadStats;
+    boolean synthesizeFileMd = !FileSystemUtil.supportsStorageIds(fs);
+    Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
+    List<FileDescriptor> newFileDescs = Lists.newArrayList();
+    HdfsFileFormat fileFormat = partitions.get(0).getFileFormat();
+    // If there is a cached partition mapped to this path, we recompute the 
block
+    // locations even if the underlying files have not changed 
(hasFileChanged()).
+    // This is done to keep the cached block metadata up to date.
+    boolean isPartitionMarkedCached = false;
+    for (HdfsPartition partition: partitions) {
+      if (partition.isMarkedCached()) {
+        isPartitionMarkedCached = true;
+        break;
+      }
+    }
+    for (FileStatus fileStatus: fileStatuses) {
+      if (!FileSystemUtil.isValidDataFile(fileStatus)) {
+        ++loadStats.hiddenFiles;
+        continue;
+      }
+      String fileName = fileStatus.getPath().getName().toString();
+      FileDescriptor fd = fileDescsByName.get(fileName);
+      if (isPartitionMarkedCached || hasFileChanged(fd, fileStatus)) {
+        if (synthesizeFileMd) {
           fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus,
-              partitions.get(0).getFileFormat(), hostIndex_);
+              fileFormat, hostIndex_);
         } else {
-          fd = FileDescriptor.create(fileStatus,
-              fileStatus.getBlockLocations(), fs, hostIndex_, 
numUnknownDiskIds);
-        }
-        Preconditions.checkNotNull(fd);
-        // Update the partitions' metadata that this file belongs to.
-        for (HdfsPartition partition: partitions) {
-          partition.getFileDescriptors().add(fd);
+          BlockLocation[] locations =
+            fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+          fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_,
+              numUnknownDiskIds);
         }
+        ++loadStats.loadedFiles;
+      } else {
+        ++loadStats.skippedFiles;
       }
+      Preconditions.checkNotNull(fd);
+      newFileDescs.add(fd);
+    }
+    loadStats.unknownDiskIds += numUnknownDiskIds.getRef();
+    for (HdfsPartition partition: partitions) 
partition.setFileDescriptors(newFileDescs);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Refreshed file metadata for " + getFullName() + " "
+          + loadStats.debugString());
+    }
+    return loadStats;
+  }
 
-      long unknownDiskIdCount = numUnknownDiskIds.getRef();
-      if (unknownDiskIdCount > 0) {
-        if (LOG.isWarnEnabled()) {
-          LOG.warn("Unknown disk id count for filesystem " + fs + ":" +
-              unknownDiskIdCount);
-        }
+  /**
+   * Compares the modification time and file size between the FileDescriptor 
and the
+   * FileStatus to determine if the file has changed. Returns true if the file 
has changed
+   * and false otherwise.
+   */
+  private static boolean hasFileChanged(FileDescriptor fd, FileStatus status) {
+    return (fd == null) || (fd.getFileLength() != status.getLen()) ||
+      (fd.getModificationTime() != status.getModificationTime());
+  }
+
+  /**
+   * Helper method to reload the file metadata of a single partition.
+   */
+  private void refreshPartitionFileMetadata(HdfsPartition partition)
+      throws CatalogException {
+    try {
+      Path partDir = partition.getLocationPath();
+      FileMetadataLoadStats stats = refreshFileMetadata(partDir,
+          Lists.newArrayList(partition));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Refreshed file metadata for %s %s",
+              getFullName(), stats.debugString()));
       }
     } catch (IOException e) {
-      throw new RuntimeException("Error loading block metadata for directory "
-          + partDir.toString() + ": " + e.getMessage(), e);
+      throw new CatalogException("Encountered invalid partition path", e);
     }
   }
 
@@ -594,90 +768,111 @@ public class HdfsTable extends Table {
         }
       }
     }
-
-    loadMetadataAndDiskIds(partsByPath);
+    // Load the file metadata from scratch.
+    loadMetadataAndDiskIds(partsByPath, false);
   }
 
   /**
-   * Refreshes block metadata information for 'partition'. This method is 
optimized for
-   * the case where the files in the partition have not changed dramatically. 
It first
-   * uses a listStatus() call on the partition directory to detect files with 
changed
-   * mtime and fetches their block locations using the getFileBlockLocations() 
method.
-   * Our benchmarks suggest that the listStatus() call is much faster then the 
listFiles()
-   * (up to ~40x faster in some cases). The initial table load still uses the 
listFiles()
-   * on the data directory that fetches both the FileStatus as well as 
BlockLocations in
-   * a single call.
+   * Helper method to load the partition file metadata from scratch. This 
method is
+   * optimized for loading newly added partitions. For refreshing existing 
partitions
+   * use refreshPartitionFileMetadata(HdfsPartition).
    */
-  private void refreshFileMetadata(HdfsPartition partition) throws 
CatalogException {
+  private void resetAndLoadPartitionFileMetadata(HdfsPartition partition) {
     Path partDir = partition.getLocationPath();
-    Preconditions.checkNotNull(partDir);
     try {
-      FileSystem fs = partDir.getFileSystem(CONF);
-      boolean synthesizeBlockMd = !FileSystemUtil.supportsStorageIds(fs);
-      // Index the partition file descriptors by their file names for O(1) 
look ups.
-      ImmutableMap<String, FileDescriptor> fileDescsByName = Maps.uniqueIndex(
-          partition.getFileDescriptors(), new Function<FileDescriptor, 
String>() {
-            public String apply(FileDescriptor desc) {
-              return desc.getFileName();
-            }
-          });
-      // Iterate through the current snapshot of the partition directory 
listing to
-      // figure out files that were newly added/modified.
-      List<FileDescriptor> newFileDescs = Lists.newArrayList();
-      FileStatus[] pathStatus = FileSystemUtil.listStatus(fs, partDir);
-      if (pathStatus != null) {
-        for (FileStatus fileStatus: pathStatus) {
-          if (!FileSystemUtil.isValidDataFile(fileStatus)) continue;
-          String fileName = fileStatus.getPath().getName().toString();
-          FileDescriptor fd = fileDescsByName.get(fileName);
-          if (fd == null || partition.isMarkedCached() ||
-              fd.getFileLength() != fileStatus.getLen() ||
-              fd.getModificationTime() != fileStatus.getModificationTime()) {
-            if (synthesizeBlockMd) {
-              fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus,
-                  partition.getFileFormat(), hostIndex_);
-            } else {
-              BlockLocation[] locations =
-                  fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
-              fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_,
-                  new Reference<Long>(Long.valueOf(0)));
-            }
-          }
-          Preconditions.checkNotNull(fd);
-          newFileDescs.add(fd);
-        }
+      FileMetadataLoadStats stats =
+          resetAndLoadFileMetadata(partDir, Lists.newArrayList(partition));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Loaded file metadata for %s %s", 
getFullName(),
+            stats.debugString()));
       }
-      partition.setFileDescriptors(newFileDescs);
-    } catch(IOException e) {
-      throw new CatalogException("Error loading block metadata for partition " 
+
-          partition.toString(), e);
+    } catch (Exception e) {
+        LOG.error("Error loading file metadata for path: " + 
partDir.toString() +
+            ". Partitions file metadata could be partially loaded.", e);
     }
   }
 
   /**
-   * Helper method to load the partition file metadata from scratch. This 
method is
-   * optimized for loading newly added partitions. For refreshing existing 
partitions
-   * use refreshFileMetadata(HdfsPartition).
+   * Returns the thread pool size to load the file metadata of this table.
+   * 'numPaths' is the number of paths for which the file metadata should be 
loaded.
+   *
+   * We use different thread pool sizes for HDFS and non-HDFS tables since the 
latter
+   * supports much higher throughput of RPC calls for listStatus/listFiles. For
+   * simplicity, the filesystem type is determined based on the table's root 
path and
+   * not for each partition individually. Based on our experiments, S3 showed 
a linear
+   * speed up (up to ~100x) with increasing number of loading threads where as 
the HDFS
+   * throughput was limited to ~5x in un-secure clusters and up to ~3.7x in 
secure
+   * clusters. We narrowed it down to scalability bottlenecks in HDFS RPC 
implementation
+   * (HADOOP-14558) on both the server and the client side.
    */
-  private void loadFileMetadataFromScratch(HdfsPartition partition) {
-    HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
-    partsByPath.put(partition.getLocationPath(), 
Lists.newArrayList(partition));
-    loadMetadataAndDiskIds(partsByPath);
+  private int getLoadingThreadPoolSize(int numPaths) throws CatalogException {
+    Preconditions.checkState(numPaths > 0);
+    FileSystem tableFs;
+    try {
+      tableFs  = (new Path(getLocation())).getFileSystem(CONF);
+    } catch (IOException e) {
+      throw new CatalogException("Invalid table path for table: " + 
getFullName(), e);
+    }
+    int threadPoolSize = FileSystemUtil.supportsStorageIds(tableFs) ?
+        MAX_HDFS_PARTITIONS_PARALLEL_LOAD : 
MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD;
+    // Thread pool size need not exceed the number of paths to be loaded.
+    return Math.min(numPaths, threadPoolSize);
   }
 
   /**
    * Helper method to load the block locations for each partition directory in
-   * partsByPath. 'partsByPath' maps each partition directory to the 
corresponding
-   * HdfsPartition objects.
+   * partsByPath using a thread pool. 'partsByPath' maps each partition 
directory to
+   * the corresponding HdfsPartition objects. If 'reuseFileMd' is true, the 
block
+   * metadata is incrementally refreshed, else it is reloaded from scratch.
    */
-  private void loadMetadataAndDiskIds(HashMap<Path, List<HdfsPartition>> 
partsByPath) {
-    LOG.info(String.format("Loading file and block metadata for %s paths: %s",
-        partsByPath.size(), getFullName()));
-    for (Path partDir: partsByPath.keySet()) {
-      loadBlockMetadata(partDir, partsByPath.get(partDir));
+  private void loadMetadataAndDiskIds(Map<Path, List<HdfsPartition>> 
partsByPath,
+      boolean reuseFileMd) throws CatalogException {
+    int numPathsToLoad = partsByPath.size();
+    // For tables without partitions we have no file metadata to load.
+    if (numPathsToLoad == 0)  return;
+
+    int threadPoolSize = getLoadingThreadPoolSize(numPathsToLoad);
+    LOG.info(String.format("Loading file and block metadata for %s paths for 
table %s " +
+        "using a thread pool of size %s", numPathsToLoad, getFullName(),
+        threadPoolSize));
+    ExecutorService partitionLoadingPool = 
Executors.newFixedThreadPool(threadPoolSize);
+    try {
+      List<Future<FileMetadataLoadStats>> pendingMdLoadTasks = 
Lists.newArrayList();
+      for (Path p: partsByPath.keySet()) {
+        FileMetadataLoadRequest blockMdLoadReq =
+            new FileMetadataLoadRequest(p, partsByPath.get(p), reuseFileMd);
+        pendingMdLoadTasks.add(partitionLoadingPool.submit(blockMdLoadReq));
+      }
+      // Wait for the partition load tasks to finish.
+      int failedLoadTasks = 0;
+      for (Future<FileMetadataLoadStats> task: pendingMdLoadTasks) {
+        try {
+          FileMetadataLoadStats loadStats = task.get();
+          if (LOG.isTraceEnabled()) LOG.trace(loadStats.debugString());
+        } catch (ExecutionException | InterruptedException e) {
+          ++failedLoadTasks;
+          if (failedLoadTasks <= MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG) {
+            LOG.error("Encountered an error loading block metadata for table: 
" +
+                getFullName(), e);
+          }
+        }
+      }
+      if (failedLoadTasks > 0) {
+        int errorsNotLogged = failedLoadTasks - 
MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG;
+        if (errorsNotLogged > 0) {
+          LOG.error(String.format("Error loading file metadata for %s paths 
for table " +
+              "%s. Only the first %s errors were logged.", failedLoadTasks, 
getFullName(),
+              MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG));
+        }
+        throw new TableLoadingException(String.format("Failed to load file 
metadata "
+            + "for %s paths for table %s. Table's file metadata could be 
partially "
+            + "loaded. Check the Catalog server log for more details.", 
failedLoadTasks,
+            getFullName()));
+      }
+    } finally {
+      partitionLoadingPool.shutdown();
     }
-    LOG.info(String.format("Loaded file and block metadata for %s paths: %s",
-        partsByPath.size(), getFullName()));
+    LOG.info(String.format("Loaded file and block metadata for %s", 
getFullName()));
   }
 
   /**
@@ -730,21 +925,47 @@ public class HdfsTable extends Table {
    * Creates a new HdfsPartition object to be added to HdfsTable's partition 
list.
    * Partitions may be empty, or may not even exist in the filesystem (a 
partition's
    * location may have been changed to a new path that is about to be created 
by an
-   * INSERT). Also loads the block metadata for this partition. Returns new 
partition
+   * INSERT). Also loads the file metadata for this partition. Returns new 
partition
    * if successful or null if none was created.
    *
    * Throws CatalogException if the supplied storage descriptor contains 
metadata that
    * Impala can't understand.
    */
-  public HdfsPartition createAndLoadPartition(StorageDescriptor 
storageDescriptor,
+  public HdfsPartition createAndLoadPartition(
       org.apache.hadoop.hive.metastore.api.Partition msPartition)
       throws CatalogException {
-    HdfsPartition hdfsPartition = createPartition(storageDescriptor, 
msPartition);
-    loadFileMetadataFromScratch(hdfsPartition);
+    HdfsPartition hdfsPartition = createPartition(msPartition.getSd(), 
msPartition);
+    resetAndLoadPartitionFileMetadata(hdfsPartition);
     return hdfsPartition;
   }
 
   /**
+   * Same as createAndLoadPartition() but is optimized for loading file 
metadata of
+   * newly created HdfsPartitions in parallel.
+   */
+  public List<HdfsPartition> createAndLoadPartitions(
+      List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions)
+      throws CatalogException {
+    HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
+    List<HdfsPartition> addedParts = Lists.newArrayList();
+    for (org.apache.hadoop.hive.metastore.api.Partition partition: 
msPartitions) {
+      HdfsPartition hdfsPartition = createPartition(partition.getSd(), 
partition);
+      Preconditions.checkNotNull(hdfsPartition);
+      addedParts.add(hdfsPartition);
+      Path partitionPath = hdfsPartition.getLocationPath();
+      List<HdfsPartition> hdfsPartitions = partsByPath.get(partitionPath);
+      if (hdfsPartitions == null) {
+        partsByPath.put(partitionPath, Lists.newArrayList(hdfsPartition));
+      } else {
+        hdfsPartitions.add(hdfsPartition);
+      }
+    }
+    loadMetadataAndDiskIds(partsByPath, false);
+    return addedParts;
+  }
+
+
+  /**
    * Creates a new HdfsPartition from a specified StorageDescriptor and an HMS 
partition
    * object.
    */
@@ -938,9 +1159,9 @@ public class HdfsTable extends Table {
    * list of partitions for which metadata should be updated. Otherwise, all 
partition
    * metadata will be updated from the Hive Metastore.
    *
-   * If 'loadFileMetadata' is true, file metadata of the specified partitions 
are
-   * reloaded from scratch. If 'partitionsToUpdate' is not specified, file 
metadata of all
-   * the partitions are loaded.
+   * If 'loadParitionFileMetadata' is true, file metadata of the specified 
partitions
+   * are reloaded from scratch. If 'partitionsToUpdate' is not specified, file 
metadata
+   * of all the partitions are loaded.
    *
    * If 'loadTableSchema' is true, the table schema is loaded from the Hive 
Metastore.
    *
@@ -953,9 +1174,9 @@ public class HdfsTable extends Table {
    * metadata cache of the table and trigger a fresh load.
    */
   public void load(boolean reuseMetadata, IMetaStoreClient client,
-      org.apache.hadoop.hive.metastore.api.Table msTbl, boolean 
loadFileMetadata,
-      boolean loadTableSchema, Set<String> partitionsToUpdate)
-      throws TableLoadingException {
+      org.apache.hadoop.hive.metastore.api.Table msTbl,
+      boolean loadParitionFileMetadata, boolean loadTableSchema,
+      Set<String> partitionsToUpdate) throws TableLoadingException {
     // turn all exceptions into TableLoadingException
     msTable_ = msTbl;
     try {
@@ -971,12 +1192,14 @@ public class HdfsTable extends Table {
       if (reuseMetadata) {
         // Incrementally update this table's partitions and file metadata
         LOG.info("Incrementally loading table metadata for: " + getFullName());
-        Preconditions.checkState(partitionsToUpdate == null || 
loadFileMetadata);
+        Preconditions.checkState(
+            partitionsToUpdate == null || loadParitionFileMetadata);
         updateMdFromHmsTable(msTbl);
         if (msTbl.getPartitionKeysSize() == 0) {
-          if (loadFileMetadata) updateUnpartitionedTableFileMd();
+          if (loadParitionFileMetadata) updateUnpartitionedTableFileMd();
         } else {
-          updatePartitionsFromHms(client, partitionsToUpdate, 
loadFileMetadata);
+          updatePartitionsFromHms(
+              client, partitionsToUpdate, loadParitionFileMetadata);
         }
         LOG.info("Incrementally loaded table metadata for: " + getFullName());
       } else {
@@ -995,7 +1218,8 @@ public class HdfsTable extends Table {
     } catch (TableLoadingException e) {
       throw e;
     } catch (Exception e) {
-      throw new TableLoadingException("Failed to load metadata for table: " + 
name_, e);
+      throw new TableLoadingException("Failed to load metadata for table: "
+          + getFullName(), e);
     }
   }
 
@@ -1020,9 +1244,9 @@ public class HdfsTable extends Table {
   /**
    * Updates the file metadata of an unpartitioned HdfsTable.
    */
-  private void updateUnpartitionedTableFileMd() throws CatalogException {
+  private void updateUnpartitionedTableFileMd() throws Exception {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("update unpartitioned table: " + name_);
+      LOG.trace("update unpartitioned table: " + getFullName());
     }
     resetPartitions();
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
@@ -1031,7 +1255,7 @@ public class HdfsTable extends Table {
     HdfsPartition part = createPartition(msTbl.getSd(), null);
     addPartition(part);
     if (isMarkedCached_) part.markCached();
-    refreshFileMetadata(part);
+    refreshPartitionFileMetadata(part);
   }
 
   /**
@@ -1039,17 +1263,18 @@ public class HdfsTable extends Table {
    * Metastore. It reloads partitions that were marked 'dirty' by doing a DROP 
+ CREATE.
    * It removes from this table partitions that no longer exist in the Hive 
Metastore and
    * adds partitions that were added externally (e.g. using Hive) to the Hive 
Metastore
-   * but do not exist in this table. If 'loadFileMetadata' is true, it triggers
+   * but do not exist in this table. If 'loadParitionFileMetadata' is true, it 
triggers
    * file/block metadata reload for the partitions specified in 
'partitionsToUpdate', if
    * any, or for all the table partitions if 'partitionsToUpdate' is null.
    */
   private void updatePartitionsFromHms(IMetaStoreClient client,
-      Set<String> partitionsToUpdate, boolean loadFileMetadata) throws 
Exception {
-    if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + name_);
+      Set<String> partitionsToUpdate, boolean loadParitionFileMetadata)
+      throws Exception {
+    if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + 
getFullName());
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     Preconditions.checkNotNull(msTbl);
     Preconditions.checkState(msTbl.getPartitionKeysSize() != 0);
-    Preconditions.checkState(loadFileMetadata || partitionsToUpdate == null);
+    Preconditions.checkState(loadParitionFileMetadata || partitionsToUpdate == 
null);
 
     // Retrieve all the partition names from the Hive Metastore. We need this 
to
     // identify the delta between partitions of the local HdfsTable and the 
table entry
@@ -1060,8 +1285,8 @@ public class HdfsTable extends Table {
         client.listPartitionNames(db_.getName(), name_, (short) -1));
     // Names of loaded partitions in this table
     Set<String> partitionNames = Sets.newHashSet();
-    // Partitions for which file metadata must be loaded
-    List<HdfsPartition> partitionsToUpdateFileMd = Lists.newArrayList();
+    // Partitions for which file metadata must be loaded, grouped by partition 
paths.
+    Map<Path, List<HdfsPartition>> partitionsToUpdateFileMdByPath = 
Maps.newHashMap();
     // Partitions that need to be dropped and recreated from scratch
     List<HdfsPartition> dirtyPartitions = Lists.newArrayList();
     // Partitions that need to be removed from this table. That includes dirty
@@ -1082,8 +1307,16 @@ public class HdfsTable extends Table {
         // list and loading them from the Hive Metastore.
         dirtyPartitions.add(partition);
       } else {
-        if (partitionsToUpdate == null && loadFileMetadata) {
-          partitionsToUpdateFileMd.add(partition);
+        if (partitionsToUpdate == null && loadParitionFileMetadata) {
+          Path partitionPath = partition.getLocationPath();
+          List<HdfsPartition> partitions =
+            partitionsToUpdateFileMdByPath.get(partitionPath);
+          if (partitions == null) {
+            partitionsToUpdateFileMdByPath.put(
+                partitionPath, Lists.newArrayList(partition));
+          } else {
+            partitions.add(partition);
+          }
         }
       }
       Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor());
@@ -1108,33 +1341,40 @@ public class HdfsTable extends Table {
     // Load file metadata. Until we have a notification mechanism for when a
     // file changes in hdfs, it is sometimes required to reload all the file
     // descriptors and block metadata of a table (e.g. REFRESH statement).
-    if (loadFileMetadata) {
+    if (loadParitionFileMetadata) {
       if (partitionsToUpdate != null) {
         // Only reload file metadata of partitions specified in 
'partitionsToUpdate'
-        Preconditions.checkState(partitionsToUpdateFileMd.isEmpty());
-        partitionsToUpdateFileMd = getPartitionsByName(partitionsToUpdate);
+        Preconditions.checkState(partitionsToUpdateFileMdByPath.isEmpty());
+        partitionsToUpdateFileMdByPath = 
getPartitionsByPath(partitionsToUpdate);
       }
-      loadPartitionFileMetadata(partitionsToUpdateFileMd);
+      loadMetadataAndDiskIds(partitionsToUpdateFileMdByPath, true);
     }
   }
 
   /**
-   * Returns the HdfsPartition objects associated with the specified list of 
partition
-   * names.
+   * Given a set of partition names, returns the corresponding HdfsPartition
+   * objects grouped by their base directory path.
    */
-  private List<HdfsPartition> getPartitionsByName(Collection<String> 
partitionNames) {
-    List<HdfsPartition> partitions = Lists.newArrayList();
+  private HashMap<Path, List<HdfsPartition>> getPartitionsByPath(
+      Collection<String> partitionNames) {
+    HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
     for (String partitionName: partitionNames) {
       String partName = DEFAULT_PARTITION_NAME;
       if (partitionName.length() > 0) {
         // Trim the last trailing char '/' from each partition name
         partName = partitionName.substring(0, partitionName.length()-1);
       }
-      Preconditions.checkState(nameToPartitionMap_.containsKey(partName),
-          "Invalid partition name: " + partName);
-      partitions.add(nameToPartitionMap_.get(partName));
+      HdfsPartition partition = nameToPartitionMap_.get(partName);
+      Preconditions.checkNotNull(partition, "Invalid partition name: " + 
partName);
+      Path partitionPath = partition.getLocationPath();
+      List<HdfsPartition> partitions = partsByPath.get(partitionPath);
+      if (partitions == null) {
+        partsByPath.put(partitionPath, Lists.newArrayList(partition));
+      } else {
+        partitions.add(partition);
+      }
     }
-    return partitions;
+    return partsByPath;
   }
 
   public void setTableStats(org.apache.hadoop.hive.metastore.api.Table msTbl) {
@@ -1155,10 +1395,9 @@ public class HdfsTable extends Table {
    * Returns whether the table has the 'skip.header.line.count' property set.
    */
   private boolean hasSkipHeaderLineCount() {
-    String key = TBL_PROP_SKIP_HEADER_LINE_COUNT;
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     if (msTbl == null) return false;
-    return msTbl.getParameters().containsKey(key);
+    return msTbl.getParameters().containsKey(TBL_PROP_SKIP_HEADER_LINE_COUNT);
   }
 
   /**
@@ -1180,13 +1419,12 @@ public class HdfsTable extends Table {
   public static int parseSkipHeaderLineCount(Map<String, String> tblProperties,
       StringBuilder error) {
     Preconditions.checkState(tblProperties != null);
-    String key = TBL_PROP_SKIP_HEADER_LINE_COUNT;
-    Preconditions.checkState(tblProperties.containsKey(key));
+    
Preconditions.checkState(tblProperties.containsKey(TBL_PROP_SKIP_HEADER_LINE_COUNT));
     // Try to parse.
-    String string_value = tblProperties.get(key);
+    String string_value = tblProperties.get(TBL_PROP_SKIP_HEADER_LINE_COUNT);
     int skipHeaderLineCount = 0;
     String error_msg = String.format("Invalid value for table property %s: %s 
(value " +
-        "must be an integer >= 0)", key, string_value);
+        "must be an integer >= 0)", TBL_PROP_SKIP_HEADER_LINE_COUNT, 
string_value);
     try {
       skipHeaderLineCount = Integer.parseInt(string_value);
     } catch (NumberFormatException exc) {
@@ -1341,55 +1579,10 @@ public class HdfsTable extends Table {
         // WRITE_ONLY the table's access level should be NONE.
         accessLevel_ = TAccessLevel.READ_ONLY;
       }
-      refreshFileMetadata(partition);
-    }
-  }
-
-  /**
-   * Loads the file descriptors and block metadata of a list of partitions. 
This function
-   * is optimized for incremental loading of the partition file metadata. To 
load it from
-   * scratch, use loadFileMetadataFromScratch(HdfsPartition).
-   */
-  private void loadPartitionFileMetadata(List<HdfsPartition> partitions)
-      throws Exception {
-    Preconditions.checkNotNull(partitions);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(String.format("loading file metadata for %d partitions",
-          partitions.size()));
-    }
-    org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
-    Preconditions.checkNotNull(msTbl);
-    for (HdfsPartition partition: partitions) {
-      org.apache.hadoop.hive.metastore.api.Partition msPart =
-          partition.toHmsPartition();
-      if (msPart != null) {
-        HdfsCachingUtil.validateCacheParams(msPart.getParameters());
-      }
-      StorageDescriptor sd = null;
-      if (msPart == null) {
-        // If this partition is not stored in the Hive Metastore (e.g. default 
partition
-        // of an unpartitioned table), use the table's storage descriptor to 
load file
-        // metadata.
-        sd = msTbl.getSd();
-      } else {
-        sd = msPart.getSd();
-      }
-      loadPartitionFileMetadata(sd, partition);
+      refreshPartitionFileMetadata(partition);
     }
   }
 
-  /**
-   * Loads the file descriptors and block metadata of a partition from its
-   * StorageDescriptor. If 'partition' does not have an entry in the Hive 
Metastore,
-   * 'storageDescriptor' is the StorageDescriptor of the associated table.
-   */
-  private void loadPartitionFileMetadata(StorageDescriptor storageDescriptor,
-      HdfsPartition partition) throws Exception {
-    Preconditions.checkNotNull(storageDescriptor);
-    Preconditions.checkNotNull(partition);
-    refreshFileMetadata(partition);
-  }
-
   @Override
   protected List<String> getColumnNamesWithHmsStats() {
     List<String> ret = Lists.newArrayList();
@@ -1405,15 +1598,13 @@ public class HdfsTable extends Table {
       throws TableLoadingException {
     super.loadFromThrift(thriftTable);
     THdfsTable hdfsTable = thriftTable.getHdfs_table();
-    Preconditions.checkState(hdfsTable.getPartition_prefixes() instanceof 
ArrayList<?>);
     partitionLocationCompressor_ = new HdfsPartitionLocationCompressor(
-        numClusteringCols_, 
(ArrayList<String>)hdfsTable.getPartition_prefixes());
+        numClusteringCols_, hdfsTable.getPartition_prefixes());
     hdfsBaseDir_ = hdfsTable.getHdfsBaseDir();
     nullColumnValue_ = hdfsTable.nullColumnValue;
     nullPartitionKeyValue_ = hdfsTable.nullPartitionKeyValue;
     multipleFileSystems_ = hdfsTable.multiple_filesystems;
-    Preconditions.checkState(hdfsTable.getNetwork_addresses() instanceof 
ArrayList<?>);
-    
hostIndex_.populate((ArrayList<TNetworkAddress>)hdfsTable.getNetwork_addresses());
+    hostIndex_.populate(hdfsTable.getNetwork_addresses());
     resetPartitions();
     try {
       for (Map.Entry<Long, THdfsPartition> part: 
hdfsTable.getPartitions().entrySet()) {
@@ -1430,7 +1621,8 @@ public class HdfsTable extends Table {
   }
 
   @Override
-  public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> 
referencedPartitions) {
+  public TTableDescriptor toThriftDescriptor(int tableId,
+      Set<Long> referencedPartitions) {
     // Create thrift descriptors to send to the BE.  The BE does not
     // need any information below the THdfsPartition level.
     TTableDescriptor tableDesc = new TTableDescriptor(tableId, 
TTableType.HDFS_TABLE,
@@ -1856,7 +2048,7 @@ public class HdfsTable extends Table {
       throws CatalogException {
     HdfsPartition refreshedPartition = createPartition(
         hmsPartition.getSd(), hmsPartition);
-    refreshFileMetadata(refreshedPartition);
+    refreshPartitionFileMetadata(refreshedPartition);
     Preconditions.checkArgument(oldPartition == null
         || oldPartition.compareTo(refreshedPartition) == 0);
     dropPartition(oldPartition);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2cd5bd5/fe/src/main/java/org/apache/impala/service/BackendConfig.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 28e5fa3..af05ad6 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -64,6 +64,13 @@ public class BackendConfig {
     return backendCfg_.sentry_catalog_polling_frequency_s;
   }
 
+  public int maxHdfsPartsParallelLoad() {
+    return backendCfg_.max_hdfs_partitions_parallel_load;
+  }
+
+  public int maxNonHdfsPartsParallelLoad() {
+    return backendCfg_.max_nonhdfs_partitions_parallel_load;
+  }
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration 
hadoop.security.auth_to_local

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2cd5bd5/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 0ae9a25..bb466dd 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -613,15 +613,22 @@ public class CatalogOpExecutor {
    */
   private Table addHdfsPartition(Table tbl, Partition partition)
       throws CatalogException {
+    return addHdfsPartitions(tbl, Lists.newArrayList(partition));
+  }
+
+  private Table addHdfsPartitions(Table tbl, List<Partition> partitions)
+      throws CatalogException {
     Preconditions.checkNotNull(tbl);
-    Preconditions.checkNotNull(partition);
+    Preconditions.checkNotNull(partitions);
     if (!(tbl instanceof HdfsTable)) {
       throw new CatalogException("Table " + tbl.getFullName() + " is not an 
HDFS table");
     }
     HdfsTable hdfsTable = (HdfsTable) tbl;
-    HdfsPartition hdfsPartition =
-        hdfsTable.createAndLoadPartition(partition.getSd(), partition);
-    return catalog_.addPartition(hdfsPartition);
+    List<HdfsPartition> hdfsPartitions = 
hdfsTable.createAndLoadPartitions(partitions);
+    for (HdfsPartition hdfsPartition: hdfsPartitions) {
+      catalog_.addPartition(hdfsPartition);
+    }
+    return hdfsTable;
   }
 
   /**
@@ -1979,12 +1986,7 @@ public class CatalogOpExecutor {
         addedHmsPartitions.addAll(
             getPartitionsFromHms(msTbl, msClient, tableName, difference));
       }
-
-      for (Partition partition: addedHmsPartitions) {
-        // Create and add the HdfsPartition to catalog. Return the table 
object with an
-        // updated catalog version.
-        addHdfsPartition(tbl, partition);
-      }
+      addHdfsPartitions(tbl, addedHmsPartitions);
     }
     return tbl;
   }
@@ -2614,12 +2616,7 @@ public class CatalogOpExecutor {
         // ifNotExists and needResults are true.
         List<Partition> hmsAddedPartitions =
             msClient.getHiveClient().add_partitions(hmsSublist, true, true);
-        for (Partition partition: hmsAddedPartitions) {
-          // Create and add the HdfsPartition. Return the table object with an 
updated
-          // catalog version.
-          addHdfsPartition(tbl, partition);
-        }
-
+        addHdfsPartitions(tbl, hmsAddedPartitions);
         // Handle HDFS cache.
         if (cachePoolName != null) {
           for (Partition partition: hmsAddedPartitions) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2cd5bd5/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java 
b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index f8f419a..e945a3b 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -88,6 +88,8 @@ public class JniCatalog {
     BackendConfig.create(cfg);
 
     Preconditions.checkArgument(cfg.num_metadata_loading_threads > 0);
+    Preconditions.checkArgument(cfg.max_hdfs_partitions_parallel_load > 0);
+    Preconditions.checkArgument(cfg.max_nonhdfs_partitions_parallel_load > 0);
     Preconditions.checkArgument(cfg.initial_hms_cnxn_timeout_s > 0);
     // This trick saves having to pass a TLogLevel enum, which is an object 
and more
     // complex to pass through JNI.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2cd5bd5/fe/src/main/java/org/apache/impala/util/ListMap.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/ListMap.java 
b/fe/src/main/java/org/apache/impala/util/ListMap.java
index 07de611..db3942b 100644
--- a/fe/src/main/java/org/apache/impala/util/ListMap.java
+++ b/fe/src/main/java/org/apache/impala/util/ListMap.java
@@ -19,28 +19,29 @@ package org.apache.impala.util;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableList;
 
 /**
- * Implementation of a bi-directional map between an index of type
- * Integer and an object of type T.  The indices are allocated on
- * demand when a reverse lookup occurs for an object not already in
- * the map.
+ * Thread-safe implementation of a bi-directional map between an
+ * index of type Integer and an object of type T.  The indices are
+ * allocated on demand when a reverse lookup occurs for an object
+ * not already in the map.
  *
  * The forward mapping is implemented as a List<> so that it can be
  * directly used as a Thrift structure.
  */
 public class ListMap<T> {
   // Maps from Integer to T.
-  private ArrayList<T> list_ = Lists.newArrayList();
+  private List<T> list_ = Collections.synchronizedList(new ArrayList<T>());
   // Maps from T to Integer.
-  private final Map<T, Integer> map_ = Maps.newHashMap();
+  private final ConcurrentHashMap<T, Integer> map_ =
+      new ConcurrentHashMap<T, Integer> ();
 
-  public ArrayList<T> getList() { return list_; }
+  public List<T> getList() { return ImmutableList.copyOf(list_); }
   public int size() { return list_.size(); }
 
   /**
@@ -54,8 +55,13 @@ public class ListMap<T> {
    */
   public int getIndex(T t) {
     Integer index = map_.get(t);
-    if (index == null) {
-      // No match was found, add a new entry.
+    if (index != null) return index;
+    // No match was found, add a new entry.
+    synchronized (this) {
+      // Another thread may have generated the new index, if yes
+      // return that.
+      index = map_.get(t);
+      if (index !=null) return index;
       list_.add(t);
       index = list_.size() - 1;
       map_.put(t, index);
@@ -67,9 +73,9 @@ public class ListMap<T> {
    * Populate the bi-map from the given list.  Does not perform a copy
    * of the list.
    */
-  public void populate(ArrayList<T> list) {
+  public synchronized void populate(List<T> list) {
     Preconditions.checkState(list_.isEmpty() && map_.isEmpty());
-    list_ = list;
+    list_ = Collections.synchronizedList(list);
     for (int i = 0; i < list_.size(); ++i) {
       map_.put(list_.get(i), i);
     }

Reply via email to