IMPALA-5431: Remove redundant path exists checks during table load

There are multiple places that do an exists() check on a path and then
perform some subsequent action on it. This pattern results in two
RPCs to the NN (one for the exists() check and one for the subsequent
action). We can avoid the exists() check in these cases since most HDFS
methods on paths throw a FileNotFoundException if the path does not
exist. This can save an RPC to NN and improve the metadata loading time.

Testing: Enough tests already cover this code path. This patch
passed core and exhaustive tests.

Metadata benchmark shows decent increase in perf numbers, for ex:

100K-PARTITIONS-1M-FILES-CUSTOM-05-QUERY-AFTER-INV -20.51%
80-PARTITIONS-250K-FILES-S3-03-RECOVER -20.58%
80-PARTITIONS-250K-FILES-11-DROP-PARTITION -22.13%
80-PARTITIONS-250K-FILES-S3-08-ADD-PARTITION -22.38%
80-PARTITIONS-250K-FILES-S3-12-DROP -23.69%
100K-PARTITIONS-1M-FILES-CUSTOM-11-REFRESH-PARTITION -23.91%
100K-PARTITIONS-1M-FILES-CUSTOM-10-REFRESH-AFTER-ADD-PARTITION -26.04%
100K-PARTITIONS-1M-FILES-CUSTOM-07-REFRESH -26.38%
80-PARTITIONS-250K-FILES-S3-02-CREATE -36.47%
100K-PARTITIONS-1M-FILES-CUSTOM-12-QUERY-PARTITIONS -58.72%
80-PARTITIONS-250K-FILES-S3-01-DROP -95.33%
80-PARTITIONS-250K-FILES-01-DROP -95.93%

Change-Id: Id10ecf64ea2eda2d0f9299c0aa371933eca22281
Reviewed-on: http://gerrit.cloudera.org:8080/7095
Reviewed-by: Bharath Vissapragada <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d9fc9be0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d9fc9be0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d9fc9be0

Branch: refs/heads/master
Commit: d9fc9be02195605a63365d6171b75e95e646dab7
Parents: 466808a
Author: Bharath Vissapragada <[email protected]>
Authored: Mon Jun 5 16:04:46 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Jun 27 03:40:52 2017 +0000

----------------------------------------------------------------------
 .../org/apache/impala/catalog/HdfsTable.java    | 68 +++++++++-----------
 .../apache/impala/common/FileSystemUtil.java    | 35 +++++++++-
 2 files changed, 64 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d9fc9be0/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index bbdbd16..6aca8dc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.catalog;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -262,9 +263,8 @@ public class HdfsTable extends Table {
   private void loadBlockMetadata(Path dirPath,
       HashMap<Path, List<HdfsPartition>> partsByPath) {
     try {
-      FileSystem fs = dirPath.getFileSystem(CONF);
       // No need to load blocks for empty partitions list.
-      if (partsByPath.size() == 0 || !fs.exists(dirPath)) return;
+      if (partsByPath.size() == 0) return;
       if (LOG.isTraceEnabled()) {
         LOG.trace("Loading block md for " + name_ + " directory " + 
dirPath.toString());
       }
@@ -291,6 +291,7 @@ public class HdfsTable extends Table {
         }
       }
 
+      FileSystem fs = dirPath.getFileSystem(CONF);
       // For file systems that do not support BlockLocation API, we manually 
synthesize
       // block location metadata based on file formats.
       if (!FileSystemUtil.supportsStorageIds(fs)) {
@@ -298,8 +299,10 @@ public class HdfsTable extends Table {
         return;
       }
 
+      RemoteIterator<LocatedFileStatus> fileStatusIter =
+          FileSystemUtil.listFiles(fs, dirPath, true);
+      if (fileStatusIter == null) return;
       Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
-      RemoteIterator<LocatedFileStatus> fileStatusIter = fs.listFiles(dirPath, 
true);
       while (fileStatusIter.hasNext()) {
         LocatedFileStatus fileStatus = fileStatusIter.next();
         if (!FileSystemUtil.isValidDataFile(fileStatus)) continue;
@@ -360,7 +363,9 @@ public class HdfsTable extends Table {
    */
   private void synthesizeBlockMetadata(FileSystem fs, Path dirPath, 
HashMap<Path,
       List<HdfsPartition>> partsByPath) throws IOException {
-    RemoteIterator<LocatedFileStatus> fileStatusIter = fs.listFiles(dirPath, 
true);
+    RemoteIterator<LocatedFileStatus> fileStatusIter =
+        FileSystemUtil.listFiles(fs, dirPath, true);
+    if (fileStatusIter == null) return;
     while (fileStatusIter.hasNext()) {
       LocatedFileStatus fileStatus = fileStatusIter.next();
       if (!FileSystemUtil.isValidDataFile(fileStatus)) continue;
@@ -644,9 +649,7 @@ public class HdfsTable extends Table {
       if (isMarkedCached_) part.markCached();
       addPartition(part);
       FileSystem fs = tblLocation.getFileSystem(CONF);
-      if (fs.exists(tblLocation)) {
-        accessLevel_ = getAvailableAccessLevel(fs, tblLocation);
-      }
+      accessLevel_ = getAvailableAccessLevel(fs, tblLocation);
     } else {
       for (org.apache.hadoop.hive.metastore.api.Partition msPartition: 
msPartitions) {
         HdfsPartition partition = createPartition(msPartition.getSd(), 
msPartition);
@@ -704,10 +707,6 @@ public class HdfsTable extends Table {
     Preconditions.checkNotNull(partDir);
     try {
       FileSystem fs = partDir.getFileSystem(CONF);
-      if (!fs.exists(partDir)) {
-        partition.setFileDescriptors(new ArrayList<FileDescriptor>());
-        return;
-      }
       if (!FileSystemUtil.supportsStorageIds(fs)) {
         synthesizeBlockMetadata(fs, partition);
         return;
@@ -722,21 +721,22 @@ public class HdfsTable extends Table {
       // Iterate through the current snapshot of the partition directory 
listing to
       // figure out files that were newly added/modified.
       List<FileDescriptor> newFileDescs = Lists.newArrayList();
-      long newPartSizeBytes = 0;
-      for (FileStatus fileStatus : fs.listStatus(partDir)) {
-        if (!FileSystemUtil.isValidDataFile(fileStatus)) continue;
-        String fileName = fileStatus.getPath().getName().toString();
-        FileDescriptor fd = fileDescsByName.get(fileName);
-        if (fd == null || partition.isMarkedCached() ||
-            fd.getFileLength() != fileStatus.getLen() ||
-            fd.getModificationTime() != fileStatus.getModificationTime()) {
-          BlockLocation[] locations =
-              fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
-          fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_,
-              new Reference<Long>(Long.valueOf(0)));
+      FileStatus[] pathStatus = FileSystemUtil.listStatus(fs, partDir);
+      if (pathStatus != null) {
+        for (FileStatus fileStatus: pathStatus) {
+          if (!FileSystemUtil.isValidDataFile(fileStatus)) continue;
+          String fileName = fileStatus.getPath().getName().toString();
+          FileDescriptor fd = fileDescsByName.get(fileName);
+          if (fd == null || partition.isMarkedCached() ||
+              fd.getFileLength() != fileStatus.getLen() ||
+              fd.getModificationTime() != fileStatus.getModificationTime()) {
+            BlockLocation[] locations =
+                fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+            fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_,
+                new Reference<Long>(Long.valueOf(0)));
+          }
+          newFileDescs.add(fd);
         }
-        newFileDescs.add(fd);
-        newPartSizeBytes += fileStatus.getLen();
       }
       partition.setFileDescriptors(newFileDescs);
     } catch(IOException e) {
@@ -800,7 +800,7 @@ public class HdfsTable extends Table {
 
     FsPermissionChecker permissionChecker = FsPermissionChecker.getInstance();
     while (location != null) {
-      if (fs.exists(location)) {
+      try {
         FsPermissionChecker.Permissions perms =
             permissionChecker.getPermissions(fs, location);
         if (perms.canReadAndWrite()) {
@@ -811,8 +811,9 @@ public class HdfsTable extends Table {
           return TAccessLevel.WRITE_ONLY;
         }
         return TAccessLevel.NONE;
+      } catch (FileNotFoundException e) {
+        location = location.getParent();
       }
-      location = location.getParent();
     }
     // Should never get here.
     Preconditions.checkNotNull(location, "Error: no path ancestor exists");
@@ -1105,9 +1106,7 @@ public class HdfsTable extends Table {
     if (msTbl.getPartitionKeysSize() == 0) {
       Path location = new Path(hdfsBaseDir_);
       FileSystem fs = location.getFileSystem(CONF);
-      if (fs.exists(location)) {
-        accessLevel_ = getAvailableAccessLevel(fs, location);
-      }
+      accessLevel_ = getAvailableAccessLevel(fs, location);
     }
     setMetaStoreTable(msTbl);
   }
@@ -1482,9 +1481,6 @@ public class HdfsTable extends Table {
       HdfsPartition partition) throws Exception {
     Preconditions.checkNotNull(storageDescriptor);
     Preconditions.checkNotNull(partition);
-    Path partDirPath = new Path(storageDescriptor.getLocation());
-    FileSystem fs = partDirPath.getFileSystem(CONF);
-    if (!fs.exists(partDirPath)) return;
     refreshFileMetadata(partition);
   }
 
@@ -1661,9 +1657,6 @@ public class HdfsTable extends Table {
       HashSet<List<LiteralExpr>> existingPartitions,
       List<List<String>> partitionsNotInHms) throws IOException {
     FileSystem fs = path.getFileSystem(CONF);
-    // Check whether the base directory exists.
-    if (!fs.exists(path)) return;
-
     List<String> partitionValues = Lists.newArrayList();
     List<LiteralExpr> partitionExprs = Lists.newArrayList();
     getAllPartitionsNotInHms(path, partitionKeys, 0, fs, partitionValues,
@@ -1701,7 +1694,8 @@ public class HdfsTable extends Table {
       return;
     }
 
-    FileStatus[] statuses = fs.listStatus(path);
+    FileStatus[] statuses = FileSystemUtil.listStatus(fs, path);
+    if (statuses == null) return;
     for (FileStatus status: statuses) {
       if (!status.isDirectory()) continue;
       Pair<String, LiteralExpr> keyValues =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d9fc9be0/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java 
b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 9ae4269..2841993 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -29,14 +29,17 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.adl.AdlFileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.impala.catalog.HdfsCompression;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -46,7 +49,7 @@ import com.google.common.base.Preconditions;
  */
 public class FileSystemUtil {
   private static final Configuration CONF = new Configuration();
-  private static final Logger LOG = Logger.getLogger(FileSystemUtil.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemUtil.class);
 
   /**
    * Performs a non-recursive delete of all visible (non-hidden) files in a 
given
@@ -483,4 +486,32 @@ public class FileSystemUtil {
         FileSystemUtil.isS3AFileSystem(path) ||
         FileSystemUtil.isADLFileSystem(path));
   }
+
+  /**
+   * Wrapper around FileSystem.listStatus() that specifically handles the case 
when
+   * the path does not exist. This helps simplify the caller code in cases 
where
+   * the file does not exist and also saves an RPC as the caller need not do a 
separate
+   * exists check for the path. Returns null if the path does not exist.
+   */
+  public static FileStatus[] listStatus(FileSystem fs, Path p) throws 
IOException {
+    try {
+      return fs.listStatus(p);
+    } catch (FileNotFoundException e) {
+      if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + 
p.toString(), e);
+      return null;
+    }
+  }
+
+  /**
+   * Wrapper around FileSystem.listFiles(), similar to the listStatus() 
wrapper above.
+   */
+  public static RemoteIterator<LocatedFileStatus> listFiles(FileSystem fs, 
Path p,
+      boolean recursive) throws IOException {
+    try {
+      return fs.listFiles(p, recursive);
+    } catch (FileNotFoundException e) {
+      if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + 
p.toString(), e);
+      return null;
+    }
+  }
 }

Reply via email to