IMPALA-4840: Fix REFRESH performance regression. The fix for IMPALA-4172 introduced a regression in performance of the REFRESH command. The regression stems from the fact that we reload the block metadata of every valid data file without considering whether it has changed since the last load. This caused unnecessary metadata loads for unchanged files and thus increasing the runtime.
The fix involves having the refresh codepath (and other operations that use the same codepath like insert etc.) to reload the metadata of only modified files by doing a listStatus() on the partition directory and checking the last modified time of each file. Without this patch, we relied on listFiles(), which fetched the block locations irrespective of whether the file has changed and it was significantly slower on unchanged tables. The initial/invalidate metadata load still fetches the block locations in bulk using listFiles(). The side effect of this change is that the refresh no longer picks up block location changes after HDFS block rebalancing. We suggest using "invalidate metadata" for that which loads the metadata from scratch. Additionally, this commit enables the reuse of metadata during table refresh (which was disabled in IMPALA-4172) to prevent reloading metadata from HMS everytime. Change-Id: I859b9fe93563ba886d0b5db6db42a14c88caada8 Reviewed-on: http://gerrit.cloudera.org:8080/6009 Reviewed-by: Dimitris Tsirogiannis <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/26eaa266 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/26eaa266 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/26eaa266 Branch: refs/heads/master Commit: 26eaa266092a5d8b37e21fd19dfbae81a952ac74 Parents: bd1d445 Author: Bharath Vissapragada <[email protected]> Authored: Thu Feb 9 22:54:40 2017 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Feb 16 04:52:54 2017 +0000 ---------------------------------------------------------------------- .../impala/catalog/CatalogServiceCatalog.java | 2 +- .../org/apache/impala/catalog/HdfsTable.java | 140 +++++++++++++++---- 2 files changed, 110 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/26eaa266/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 2c42874..8be0aa3 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -974,7 +974,7 @@ public class CatalogServiceCatalog extends Catalog { throw new TableLoadingException("Error loading metadata for table: " + db.getName() + "." + tblName.getTable_name(), e); } - tbl.load(false, msClient.getHiveClient(), msTbl); + tbl.load(true, msClient.getHiveClient(), msTbl); } tbl.setCatalogVersion(newCatalogVersion); LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName())); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/26eaa266/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 795dae2..6096ba9 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -81,8 +81,10 @@ import org.apache.impala.util.TResultRowBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -311,28 +313,10 @@ public class HdfsTable extends Table { FileDescriptor fd = new FileDescriptor(fileName, fileStatus.getLen(), fileStatus.getModificationTime()); BlockLocation[] locations = fileStatus.getBlockLocations(); - String partPathDirName = partPathDir.toString(); - for (BlockLocation loc: locations) { - Set<String> cachedHosts = Sets.newHashSet(loc.getCachedHosts()); - // Enumerate all replicas of the block, adding any unknown hosts - // to hostIndex_. We pick the network address from getNames() and - // map it to the corresponding hostname from getHosts(). - List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize( - loc.getNames().length); - for (int i = 0; i < loc.getNames().length; ++i) { - TNetworkAddress networkAddress = - BlockReplica.parseLocation(loc.getNames()[i]); - replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress), - cachedHosts.contains(loc.getHosts()[i]))); - } - FileBlock currentBlock = - new FileBlock(loc.getOffset(), loc.getLength(), replicas); - THdfsFileBlock tHdfsFileBlock = currentBlock.toThrift(); - fd.addThriftFileBlock(tHdfsFileBlock); - unknownDiskIdCount += loadDiskIds(loc, tHdfsFileBlock); - } + unknownDiskIdCount += setFdBlockMetadata(fd, locations); if (LOG.isTraceEnabled()) { - LOG.trace("Adding file md dir: " + partPathDirName + " file: " + fileName); + LOG.trace("Adding file md dir: " + partPathDir.toString() + " file: " + + fileName); } // Update the partitions' metadata that this file belongs to. for (HdfsPartition partition: partitions) { @@ -354,6 +338,35 @@ public class HdfsTable extends Table { } /** + * Sets the block metadata for FileDescriptor 'fd' using block location metadata + * from 'locations'. + */ + private int setFdBlockMetadata(FileDescriptor fd, BlockLocation[] locations) + throws IOException { + int unknownFdDiskIds = 0; + for (BlockLocation loc: locations) { + Set<String> cachedHosts = Sets.newHashSet(loc.getCachedHosts()); + // Enumerate all replicas of the block, adding any unknown hosts + // to hostIndex_. We pick the network address from getNames() and + // map it to the corresponding hostname from getHosts(). + List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize( + loc.getNames().length); + for (int i = 0; i < loc.getNames().length; ++i) { + TNetworkAddress networkAddress = + BlockReplica.parseLocation(loc.getNames()[i]); + replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress), + cachedHosts.contains(loc.getHosts()[i]))); + } + FileBlock currentBlock = + new FileBlock(loc.getOffset(), loc.getLength(), replicas); + THdfsFileBlock tHdfsFileBlock = currentBlock.toThrift(); + fd.addThriftFileBlock(tHdfsFileBlock); + unknownFdDiskIds += loadDiskIds(loc, tHdfsFileBlock); + } + return unknownFdDiskIds; + } + + /** * Loads the disk IDs for BlockLocation 'location' and its corresponding file block. * HDFS API for BlockLocation returns a storageID UUID string for each disk * hosting the block, which is then mapped to a 0-based integer id called disk ID. @@ -388,6 +401,20 @@ public class HdfsTable extends Table { } /** + * Synthesize the block metadata for a given HdfsPartition object. Should only + * be called for FileSystems that do not support storage IDs. + */ + private void synthesizeBlockMetadata(FileSystem fs, HdfsPartition partition) + throws IOException { + Preconditions.checkState(!FileSystemUtil.supportsStorageIds(fs)); + HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap(); + Path partitionPath = partition.getLocationPath(); + partition.setFileDescriptors(new ArrayList<FileDescriptor>()); + partsByPath.put(partitionPath, Lists.newArrayList(partition)); + synthesizeBlockMetadata(fs, partitionPath, partsByPath); + } + + /** * For filesystems that don't support BlockLocation API, synthesize file blocks * by manually splitting the file range into fixed-size blocks. That way, scan * ranges can be derived from file blocks as usual. All synthesized blocks are given @@ -755,11 +782,62 @@ public class HdfsTable extends Table { loadMetadataAndDiskIds(dirsToLoad, partsByPath); } - private void loadMetadataAndDiskIds(HdfsPartition partition) throws CatalogException { - Path partDirPath = partition.getLocationPath(); - HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap(); - partsByPath.put(partDirPath, Lists.newArrayList(partition)); - loadMetadataAndDiskIds(Lists.newArrayList(partDirPath), partsByPath); + /** + * Refreshes block metadata information for 'partition'. This method is optimized for + * the case where the files in the partition have not changed dramatically. It first + * uses a listStatus() call on the partition directory to detect files with changed + * mtime and fetches their block locations using the getFileBlockLocations() method. + * Our benchmarks suggest that the listStatus() call is much faster then the listFiles() + * (up to ~40x faster in some cases). The initial table load still uses the listFiles() + * on the data directory that fetches both the FileStatus as well as BlockLocations in + * a single call. + */ + private void refreshFileMetadata(HdfsPartition partition) throws CatalogException { + Path partDir = partition.getLocationPath(); + Preconditions.checkNotNull(partDir); + try { + FileSystem fs = partDir.getFileSystem(CONF); + if (!fs.exists(partDir)) { + partition.setFileDescriptors(new ArrayList<FileDescriptor>()); + return; + } + if (!FileSystemUtil.supportsStorageIds(fs)) { + synthesizeBlockMetadata(fs, partition); + return; + } + // Index the partition file descriptors by their file names for O(1) look ups. + ImmutableMap<String, FileDescriptor> fileDescsByName = Maps.uniqueIndex( + partition.getFileDescriptors(), new Function<FileDescriptor, String>() { + public String apply(FileDescriptor desc) { + return desc.getFileName(); + } + }); + // Iterate through the current snapshot of the partition directory listing to + // figure out files that were newly added/modified. + List<FileDescriptor> newFileDescs = Lists.newArrayList(); + int newPartSizeBytes = 0; + for (FileStatus fileStatus : fs.listStatus(partDir)) { + if (!FileSystemUtil.isValidDataFile(fileStatus)) continue; + String fileName = fileStatus.getPath().getName().toString(); + FileDescriptor fd = fileDescsByName.get(fileName); + if (fd == null || partition.isMarkedCached() || + fd.getFileLength() != fileStatus.getLen() || + fd.getModificationTime() != fileStatus.getModificationTime()) { + fd = new FileDescriptor(fileName, fileStatus.getLen(), + fileStatus.getModificationTime()); + setFdBlockMetadata(fd, + fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen())); + } + newFileDescs.add(fd); + newPartSizeBytes += fileStatus.getLen(); + } + partition.setFileDescriptors(newFileDescs); + numHdfsFiles_ += newFileDescs.size(); + totalHdfsBytes_ += newPartSizeBytes; + } catch(IOException e) { + throw new CatalogException("Error loading block metadata for partition " + + partition.toString(), e); + } } /** @@ -772,7 +850,7 @@ public class HdfsTable extends Table { LOG.info(String.format( "Loading file and block metadata for %s partitions from %s paths: %s", partsByPath.size(), locations.size(), getFullName())); - for (Path location: locations) { loadBlockMetadata(location, partsByPath); } + for (Path location: locations) loadBlockMetadata(location, partsByPath); LOG.info(String.format( "Loaded file and block metadata for %s partitions from %s paths: %s", partsByPath.size(), locations.size(), getFullName())); @@ -831,7 +909,7 @@ public class HdfsTable extends Table { org.apache.hadoop.hive.metastore.api.Partition msPartition) throws CatalogException { HdfsPartition hdfsPartition = createPartition(storageDescriptor, msPartition); - loadMetadataAndDiskIds(hdfsPartition); + refreshFileMetadata(hdfsPartition); return hdfsPartition; } @@ -1119,8 +1197,8 @@ public class HdfsTable extends Table { addDefaultPartition(msTbl.getSd()); HdfsPartition part = createPartition(msTbl.getSd(), null); addPartition(part); - loadMetadataAndDiskIds(part); if (isMarkedCached_) part.markCached(); + refreshFileMetadata(part); } /** @@ -1436,7 +1514,7 @@ public class HdfsTable extends Table { // WRITE_ONLY the table's access level should be NONE. accessLevel_ = TAccessLevel.READ_ONLY; } - loadMetadataAndDiskIds(partition); + refreshFileMetadata(partition); } } @@ -1492,7 +1570,7 @@ public class HdfsTable extends Table { numHdfsFiles_ -= partition.getNumFileDescriptors(); totalHdfsBytes_ -= partition.getSize(); Preconditions.checkState(numHdfsFiles_ >= 0 && totalHdfsBytes_ >= 0); - loadMetadataAndDiskIds(partition); + refreshFileMetadata(partition); } @Override
