Repository: incubator-impala Updated Branches: refs/heads/master b9034ea0d -> b8b64e110
IMPALA-4172/IMPALA-3653: Improvements to block metadata loading This patch improves the block metadata loading (locations and disk storage IDs) for partitioned and un-partitioned tables in the Catalog server. Without this patch: ------------------ We loop through each and every file in the table/partition directories and call getFileBlockLocations() on it to obtain the block metadata. This results in large number of RPC calls to the Namenode, especially for tables with large no. of files/partitions. With this patch: --------------- We move the block metadata querying to use listStatus() call which accepts a directory as input and fetches the 'BlockLocation' objects for every file recursively in that directory. This improves the metadata loading in the following ways. - For non-partitioned tables, we query all the BlockLocations in a single RPC call in the base table directory and load the corresponding disk IDs. - For partitioned tables, we query the BlockLocations for all the partitions residing under the base table directories in a single RPC and then load every partition with non-default partition directory separately. - REFRESH on a table reloads the block metadata from scratch for every data file every time. So it can be used as a replacement for invalidate in situations like HDFS block rebalancing which needs block metadata update. Also, this patch does away with VolumeIds returned by the HDFS NN and uses the new StorageIDs returned by the BlockLocation class. These StorageIDs are UUID strings and hence are mapped to a per-node 0-based index as expected by the backend. In the upcoming versions of Hadoop APIs, getFileBlockStorageLocations() is deprecated and instead the listStatus() returns BlockLocations with storage IDs embedded. This patch makes use of this improvement to reduce an additional RPC to the NN to fetch the storage locations. Change-Id: Ie127658172e6e70dae441374530674a4ac9d5d26 Reviewed-on: http://gerrit.cloudera.org:8080/5148 Reviewed-by: Bharath Vissapragada <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6662c553 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6662c553 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6662c553 Branch: refs/heads/master Commit: 6662c55364b1c429340edc1ffd14323167f7b561 Parents: b9034ea Author: Bharath Vissapragada <[email protected]> Authored: Sun Nov 13 22:15:41 2016 -0800 Committer: Internal Jenkins <[email protected]> Committed: Sat Dec 3 21:17:46 2016 +0000 ---------------------------------------------------------------------- .../impala/catalog/CatalogServiceCatalog.java | 2 +- .../org/apache/impala/catalog/DiskIdMapper.java | 88 +++ .../apache/impala/catalog/HdfsPartition.java | 7 +- .../org/apache/impala/catalog/HdfsTable.java | 576 ++++++++----------- .../apache/impala/common/FileSystemUtil.java | 30 +- .../impala/service/CatalogOpExecutor.java | 3 +- 6 files changed, 366 insertions(+), 340 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6662c553/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 6fbcccc..85d92cb 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -928,7 +928,7 @@ public class CatalogServiceCatalog extends Catalog { throw new TableLoadingException("Error loading metadata for table: " + db.getName() + "." + tblName.getTable_name(), e); } - tbl.load(true, msClient.getHiveClient(), msTbl); + tbl.load(false, msClient.getHiveClient(), msTbl); } tbl.setCatalogVersion(newCatalogVersion); return tbl; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6662c553/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java b/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java new file mode 100644 index 0000000..6fdcef0 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog; + +import com.google.common.collect.Maps; +import com.google.common.base.Strings; +import com.google.common.base.Preconditions; + +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A singleton class that maps HDFS storage-UUIDs to per-host 0-based, sequential disk + * ids. This mapping is internally implemented as a global static object shared + * across all the table instances. The rationale behind this implementation is + * - To maintain a consistent mapping across all the table instances so that the + * assignment of scan ranges to I/O threads is balanced and consistent for all scans + * on the same host. + * - To Reduce memory usage in the Catalog since UUIDs can potentially consume a lot of + * memory when maintained per table instance. + */ +public class DiskIdMapper { + + public static DiskIdMapper INSTANCE = new DiskIdMapper(); + + private DiskIdMapper() {} + + // Maps each storage ID UUID string returned by the BlockLocation API, to a per-node + // sequential 0-based integer disk id used by the BE scanners. This assumes that + // the storage ID of a particular disk is unique across all the nodes in the cluster. + private ConcurrentHashMap<String, Integer> storageUuidToDiskId = + new ConcurrentHashMap<String, Integer>(); + + // Per-host ID generator for storage UUID to integer ID mapping. This maps each host + // to the corresponding latest 0-based integer ID. + private HashMap<String, Integer> storageIdGenerator = Maps.newHashMap(); + + /** + * Returns a disk id (0-based) index for storageUuid on host 'host'. Generates a + * new disk ID for storageUuid if one doesn't already exist. We cache the mappings + * already generated for faster lookups. + * + * TODO: It is quite possible that there will be lock contention in this method during + * the initial metadata load. Figure out ways to fix it using finer locking scheme. + */ + public int getDiskId(String host, String storageUuid) { + Preconditions.checkState(!Strings.isNullOrEmpty(host)); + // Initialize the diskId as -1 to indicate it is unknown + int diskId = -1; + // Check if an existing mapping is already present. This is intentionally kept + // out of the synchronized block to avoid contention for lookups. Once a reasonable + // amount of data loading is done and storageIdtoInt is populated with storage IDs + // across the cluster, we expect to have a good hit rate. + Integer intId = storageUuidToDiskId.get(storageUuid); + if (intId != null) return intId; + synchronized (storageIdGenerator) { + // Mapping might have been added by another thread that entered the synchronized + // block first. + intId = storageUuidToDiskId.get(storageUuid); + if (intId != null) return intId; + // No mapping exists, create a new disk ID for 'storageUuid' + if (storageIdGenerator.containsKey(host)) { + diskId = storageIdGenerator.get(host) + 1; + } else { + // First diskId of this host. + diskId = 0; + } + storageIdGenerator.put(host, new Integer(diskId)); + storageUuidToDiskId.put(storageUuid, new Integer(diskId)); + } + return diskId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6662c553/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index 8718419..c240613 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -107,7 +107,11 @@ public class HdfsPartition implements Comparable<HdfsPartition> { } public void addFileBlock(FileBlock blockMd) { - fileDescriptor_.addToFile_blocks(blockMd.toThrift()); + addThriftFileBlock(blockMd.toThrift()); + } + + public void addThriftFileBlock(THdfsFileBlock block) { + fileDescriptor_.addToFile_blocks(block); } public static FileDescriptor fromThrift(THdfsFileDesc desc) { @@ -381,6 +385,7 @@ public class HdfsPartition implements Comparable<HdfsPartition> { public String getLocation() { return (location_ != null) ? location_.toString() : null; } + public Path getLocationPath() { return new Path(getLocation()); } public long getId() { return id_; } public HdfsTable getTable() { return table_; } public void setNumRows(long numRows) { numRows_ = numRows; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6662c553/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 2a30cec..386ef79 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -36,8 +36,9 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStorageLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.VolumeId; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -90,6 +91,7 @@ import org.apache.impala.util.MetaStoreUtil; import org.apache.impala.util.TAccessLevelUtil; import org.apache.impala.util.TResultRowBuilder; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -177,8 +179,6 @@ public class HdfsTable extends Table { // data is cached or that all/any partitions are cached. private boolean isMarkedCached_ = false; - private static boolean hasLoggedDiskIdFormatWarning_ = false; - // Array of sorted maps storing the association between partition values and // partition ids. There is one sorted map per partition key. // TODO: We should not populate this for HdfsTable objects stored in the catalog @@ -213,10 +213,6 @@ public class HdfsTable extends Table { private HdfsPartitionLocationCompressor partitionLocationCompressor_; - // Map of file names to file descriptors for each partition location (directory). - private Map<String, Map<String, FileDescriptor>> - perPartitionFileDescMap_ = Maps.newHashMap(); - // Total number of Hdfs files in this table. Set in load(). private long numHdfsFiles_; @@ -249,48 +245,6 @@ public class HdfsTable extends Table { // and its usage in getFileSystem suggests it should be. private static final Configuration CONF = new Configuration(); - private static final boolean SUPPORTS_VOLUME_ID; - - // Wrapper around a FileSystem object to hash based on the underlying FileSystem's - // scheme and authority. - private static class FsKey { - FileSystem filesystem; - - public FsKey(FileSystem fs) { filesystem = fs; } - - @Override - public int hashCode() { return filesystem.getUri().hashCode(); } - - @Override - public boolean equals(Object o) { - if (o == this) return true; - if (o != null && o instanceof FsKey) { - URI uri = filesystem.getUri(); - URI otherUri = ((FsKey)o).filesystem.getUri(); - return uri.equals(otherUri); - } - return false; - } - - @Override - public String toString() { return filesystem.getUri().toString(); } - } - - // Keeps track of newly added THdfsFileBlock metadata and its corresponding - // BlockLocation. For each i, blocks.get(i) corresponds to locations.get(i). Once - // all the new file blocks are collected, the disk volume IDs are retrieved in one - // batched DFS call. - private static class FileBlocksInfo { - final List<THdfsFileBlock> blocks = Lists.newArrayList(); - final List<BlockLocation> locations = Lists.newArrayList(); - - public void addBlocks(List<THdfsFileBlock> b, List<BlockLocation> l) { - Preconditions.checkState(b.size() == l.size()); - blocks.addAll(b); - locations.addAll(l); - } - } - public HdfsTable(org.apache.hadoop.hive.metastore.api.Table msTbl, Db db, String name, String owner) { super(msTbl, db, name, owner); @@ -298,39 +252,6 @@ public class HdfsTable extends Table { new HdfsPartitionLocationCompressor(numClusteringCols_); } - static { - SUPPORTS_VOLUME_ID = - CONF.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, - DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); - } - - /** - * Returns a disk id (0-based) index from the Hdfs VolumeId object. - * There is currently no public API to get at the volume id. We'll have to get it - * by accessing the internals. - */ - private static int getDiskId(VolumeId hdfsVolumeId) { - // Initialize the diskId as -1 to indicate it is unknown - int diskId = -1; - - if (hdfsVolumeId != null) { - // TODO: this is a hack and we'll have to address this by getting the - // public API. Also, we need to be very mindful of this when we change - // the version of HDFS. - String volumeIdString = hdfsVolumeId.toString(); - // This is the hacky part. The toString is currently the underlying id - // encoded as hex. - byte[] volumeIdBytes = StringUtils.hexStringToByte(volumeIdString); - if (volumeIdBytes != null && volumeIdBytes.length == 4) { - diskId = Bytes.toInt(volumeIdBytes); - } else if (!hasLoggedDiskIdFormatWarning_) { - LOG.warn("wrong disk id format: " + volumeIdString); - hasLoggedDiskIdFormatWarning_ = true; - } - } - return diskId; - } - public boolean spansMultipleFileSystems() { return multipleFileSystems_; } /** @@ -360,74 +281,181 @@ public class HdfsTable extends Table { } /** - * Queries the filesystem to load the file block metadata (e.g. DFS blocks) for the - * given file. Adds the newly created block metadata and block location to the - * perFsFileBlocks, so that the disk IDs for each block can be retrieved with one - * call to DFS. + * Drops and re-loads the block metadata for all partitions in 'partsByPath' whose + * location is under the given 'dirPath'. It involves the following steps: + * - Clear the current block metadata of the partitions. + * - Call FileSystem.listStatus() on 'dirPath' to fetch the BlockLocations for each + * file under it recursively. + * - For every valid data file, map it to a partition from 'partsByPath' (if one exists) + * and enumerate all its blocks and their corresponding hosts and disk IDs. + * TODO: Split this method into more logical methods for cleaner code. */ - private void loadBlockMetadata(FileSystem fs, FileStatus file, FileDescriptor fd, - HdfsFileFormat fileFormat, Map<FsKey, FileBlocksInfo> perFsFileBlocks) { - Preconditions.checkNotNull(fd); - Preconditions.checkNotNull(perFsFileBlocks); - Preconditions.checkArgument(!file.isDirectory()); - if (LOG.isTraceEnabled()) { - LOG.trace("load block md for " + name_ + " file " + fd.getFileName()); - } - - if (!FileSystemUtil.hasGetFileBlockLocations(fs)) { - synthesizeBlockMetadata(fs, fd, fileFormat); - return; - } + private void loadBlockMetadata(Path dirPath, + HashMap<Path, List<HdfsPartition>> partsByPath) { try { - BlockLocation[] locations = fs.getFileBlockLocations(file, 0, file.getLen()); - Preconditions.checkNotNull(locations); - - // Loop over all blocks in the file. - for (BlockLocation loc: locations) { - Preconditions.checkNotNull(loc); - // Get the location of all block replicas in ip:port format. - String[] blockHostPorts = loc.getNames(); - // Get the hostnames for all block replicas. Used to resolve which hosts - // contain cached data. The results are returned in the same order as - // block.getNames() so it allows us to match a host specified as ip:port to - // corresponding hostname using the same array index. - String[] blockHostNames = loc.getHosts(); - Preconditions.checkState(blockHostNames.length == blockHostPorts.length); - // Get the hostnames that contain cached replicas of this block. - Set<String> cachedHosts = - Sets.newHashSet(Arrays.asList(loc.getCachedHosts())); - Preconditions.checkState(cachedHosts.size() <= blockHostNames.length); - - // Now enumerate all replicas of the block, adding any unknown hosts - // to hostMap_/hostList_. The host ID (index in to the hostList_) for each - // replica is stored in replicaHostIdxs. - List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize( - blockHostPorts.length); - for (int i = 0; i < blockHostPorts.length; ++i) { - TNetworkAddress networkAddress = BlockReplica.parseLocation(blockHostPorts[i]); - Preconditions.checkState(networkAddress != null); - replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress), - cachedHosts.contains(blockHostNames[i]))); + FileSystem fs = dirPath.getFileSystem(CONF); + // No need to load blocks for empty partitions list. + if (partsByPath.size() == 0 || !fs.exists(dirPath)) return; + if (LOG.isTraceEnabled()) { + LOG.trace("Loading block md for " + name_ + " directory " + dirPath.toString()); + } + // Clear the state of partitions under dirPath since they are now updated based + // on the current snapshot of files in the directory. + for (Map.Entry<Path, List<HdfsPartition>> entry: partsByPath.entrySet()) { + Path partDir = entry.getKey(); + if (!FileSystemUtil.isDescendantPath(partDir, dirPath)) continue; + for (HdfsPartition partition: entry.getValue()) { + partition.setFileDescriptors(new ArrayList<FileDescriptor>()); } - fd.addFileBlock(new FileBlock(loc.getOffset(), loc.getLength(), replicas)); } - // Remember the THdfsFileBlocks and corresponding BlockLocations. Once all the - // blocks are collected, the disk IDs will be queried in one batch per filesystem. - addPerFsFileBlocks(perFsFileBlocks, fs, fd.getFileBlocks(), - Arrays.asList(locations)); + // For file systems that do not support BlockLocation API, we manually synthesize + // block location metadata based on file formats. + if (!FileSystemUtil.supportsStorageIds(fs)) { + synthesizeBlockMetadata(fs, dirPath, partsByPath); + return; + } + int unknownDiskIdCount = 0; + RemoteIterator<LocatedFileStatus> fileStatusIter = fs.listFiles(dirPath, true); + while (fileStatusIter.hasNext()) { + LocatedFileStatus fileStatus = fileStatusIter.next(); + if (!FileSystemUtil.isValidDataFile(fileStatus)) continue; + // Find the partition that this file belongs (if any). + Path partPathDir = fileStatus.getPath().getParent(); + Preconditions.checkNotNull(partPathDir); + + List<HdfsPartition> partitions = partsByPath.get(partPathDir); + // Skip if this file does not belong to any known partition. + if (partitions == null) { + if (LOG.isTraceEnabled()) { + LOG.trace("File " + fileStatus.getPath().toString() + " doesn't correspond " + + " to a known partition. Skipping metadata load for this file."); + } + continue; + } + String fileName = fileStatus.getPath().getName(); + FileDescriptor fd = new FileDescriptor(fileName, fileStatus.getLen(), + fileStatus.getModificationTime()); + BlockLocation[] locations = fileStatus.getBlockLocations(); + String partPathDirName = partPathDir.toString(); + for (BlockLocation loc: locations) { + Set<String> cachedHosts = Sets.newHashSet(loc.getCachedHosts()); + // Enumerate all replicas of the block, adding any unknown hosts + // to hostIndex_. We pick the network address from getNames() and + // map it to the corresponding hostname from getHosts(). + List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize( + loc.getNames().length); + for (int i = 0; i < loc.getNames().length; ++i) { + TNetworkAddress networkAddress = + BlockReplica.parseLocation(loc.getNames()[i]); + replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress), + cachedHosts.contains(loc.getHosts()[i]))); + } + FileBlock currentBlock = + new FileBlock(loc.getOffset(), loc.getLength(), replicas); + THdfsFileBlock tHdfsFileBlock = currentBlock.toThrift(); + fd.addThriftFileBlock(tHdfsFileBlock); + unknownDiskIdCount += loadDiskIds(loc, tHdfsFileBlock); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Adding file md dir: " + partPathDirName + " file: " + fileName); + } + // Update the partitions' metadata that this file belongs to. + for (HdfsPartition partition: partitions) { + partition.getFileDescriptors().add(fd); + numHdfsFiles_++; + totalHdfsBytes_ += fd.getFileLength(); + } + } + if (unknownDiskIdCount > 0) { + if (LOG.isWarnEnabled()) { + LOG.warn("Unknown disk id count for filesystem " + fs + ":" + + unknownDiskIdCount); + } + } + } catch (IOException e) { + throw new RuntimeException("Error loading block metadata for directory " + + dirPath.toString() + ": " + e.getMessage(), e); + } + } + + /** + * Loads the disk IDs for BlockLocation 'location' and its corresponding file block. + * HDFS API for BlockLocation returns a storageID UUID string for each disk + * hosting the block, which is then mapped to a 0-based integer id called disk ID. + * Returns the number of unknown disk IDs encountered in this process. + */ + private int loadDiskIds(BlockLocation location, THdfsFileBlock fileBlock) { + int unknownDiskIdCount = 0; + String[] storageIds = location.getStorageIds(); + String[] hosts; + try { + hosts = location.getHosts(); } catch (IOException e) { - throw new RuntimeException("couldn't determine block locations for path '" + - file.getPath() + "':\n" + e.getMessage(), e); + LOG.error("Couldn't get hosts for block: " + location.toString(), e); + return unknownDiskIdCount; + } + if (storageIds.length != hosts.length) { + LOG.error("Number of storage IDs and number of hosts for block: " + location + .toString() + " mismatch. Skipping disk ID loading for this block."); + return unknownDiskIdCount; } + int[] diskIDs = new int[storageIds.length]; + for (int i = 0; i < storageIds.length; ++i) { + if (Strings.isNullOrEmpty(storageIds[i])) { + diskIDs[i] = -1; + ++unknownDiskIdCount; + } else { + diskIDs[i] = DiskIdMapper.INSTANCE.getDiskId(hosts[i], storageIds[i]); + } + } + FileBlock.setDiskIds(diskIDs, fileBlock); + return unknownDiskIdCount; } /** - * For filesystems that don't override getFileBlockLocations, synthesize file blocks + * For filesystems that don't support BlockLocation API, synthesize file blocks * by manually splitting the file range into fixed-size blocks. That way, scan * ranges can be derived from file blocks as usual. All synthesized blocks are given * an invalid network address so that the scheduler will treat them as remote. */ - private void synthesizeBlockMetadata(FileSystem fs, FileDescriptor fd, + private void synthesizeBlockMetadata(FileSystem fs, Path dirPath, HashMap<Path, + List<HdfsPartition>> partsByPath) throws IOException { + RemoteIterator<LocatedFileStatus> fileStatusIter = fs.listFiles(dirPath, true); + while (fileStatusIter.hasNext()) { + LocatedFileStatus fileStatus = fileStatusIter.next(); + if (!FileSystemUtil.isValidDataFile(fileStatus)) continue; + Path partPathDir = fileStatus.getPath().getParent(); + Preconditions.checkNotNull(partPathDir); + List<HdfsPartition> partitions = partsByPath.get(partPathDir); + // Skip if this file does not belong to any known partition. + if (partitions == null) { + if (LOG.isTraceEnabled()) { + LOG.trace("File " + fileStatus.getPath().toString() + " doesn't correspond " + + " to a known partition. Skipping metadata load for this file."); + } + continue; + } + String fileName = fileStatus.getPath().getName(); + FileDescriptor fd = new FileDescriptor(fileName, fileStatus.getLen(), + fileStatus.getModificationTime()); + Preconditions.checkState(partitions.size() > 0); + // For the purpose of synthesizing block metadata, we assume that all partitions + // with the same location have the same file format. + HdfsFileFormat fileFormat = partitions.get(0).getFileFormat(); + synthesizeFdBlockMetadata(fs, fd, fileFormat); + // Update the partitions' metadata that this file belongs to. + for (HdfsPartition partition: partitions) { + partition.getFileDescriptors().add(fd); + numHdfsFiles_++; + totalHdfsBytes_ += fd.getFileLength(); + } + } + } + + /** + * Helper method to synthesize block metadata for file descriptor fd. + */ + private void synthesizeFdBlockMetadata(FileSystem fs, FileDescriptor fd, HdfsFileFormat fileFormat) { long start = 0; long remaining = fd.getFileLength(); @@ -449,72 +477,6 @@ public class HdfsTable extends Table { } } - /** - * Populates disk/volume ID metadata inside the newly created THdfsFileBlocks. - * perFsFileBlocks maps from each filesystem to a FileBLocksInfo. The first list - * contains the newly created THdfsFileBlocks and the second contains the - * corresponding BlockLocations. - */ - private void loadDiskIds(Map<FsKey, FileBlocksInfo> perFsFileBlocks) { - if (!SUPPORTS_VOLUME_ID) return; - // Loop over each filesystem. If the filesystem is DFS, retrieve the volume IDs - // for all the blocks. - for (FsKey fsKey: perFsFileBlocks.keySet()) { - FileSystem fs = fsKey.filesystem; - // Only DistributedFileSystem has getFileBlockStorageLocations(). It's not even - // part of the FileSystem interface, so we'll need to downcast. - if (!(fs instanceof DistributedFileSystem)) continue; - - if (LOG.isTraceEnabled()) { - LOG.trace("Loading disk ids for: " + getFullName() + ". nodes: " + - hostIndex_.size() + ". filesystem: " + fsKey); - } - DistributedFileSystem dfs = (DistributedFileSystem)fs; - FileBlocksInfo blockLists = perFsFileBlocks.get(fsKey); - Preconditions.checkNotNull(blockLists); - BlockStorageLocation[] storageLocs = null; - try { - // Get the BlockStorageLocations for all the blocks - storageLocs = dfs.getFileBlockStorageLocations(blockLists.locations); - } catch (IOException e) { - LOG.error("Couldn't determine block storage locations for filesystem " + - fs + ":\n" + e.getMessage()); - continue; - } - if (storageLocs == null || storageLocs.length == 0) { - LOG.warn("Attempted to get block locations for filesystem " + fs + - " but the call returned no results"); - continue; - } - if (storageLocs.length != blockLists.locations.size()) { - // Block locations and storage locations didn't match up. - LOG.error("Number of block storage locations not equal to number of blocks: " - + "#storage locations=" + Long.toString(storageLocs.length) - + " #blocks=" + Long.toString(blockLists.locations.size())); - continue; - } - long unknownDiskIdCount = 0; - // Attach volume IDs given by the storage location to the corresponding - // THdfsFileBlocks. - for (int locIdx = 0; locIdx < storageLocs.length; ++locIdx) { - VolumeId[] volumeIds = storageLocs[locIdx].getVolumeIds(); - THdfsFileBlock block = blockLists.blocks.get(locIdx); - // Convert opaque VolumeId to 0 based ids. - // TODO: the diskId should be eventually retrievable from Hdfs when the - // community agrees this API is useful. - int[] diskIds = new int[volumeIds.length]; - for (int i = 0; i < volumeIds.length; ++i) { - diskIds[i] = getDiskId(volumeIds[i]); - if (diskIds[i] < 0) ++unknownDiskIdCount; - } - FileBlock.setDiskIds(diskIds, block); - } - if (unknownDiskIdCount > 0) { - LOG.warn("Unknown disk id count for filesystem " + fs + ":" + unknownDiskIdCount); - } - } - } - @Override public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; @@ -700,7 +662,6 @@ public class HdfsTable extends Table { nameToPartitionMap_.clear(); partitionValuesMap_.clear(); nullPartitionIds_.clear(); - perPartitionFileDescMap_.clear(); // Initialize partitionValuesMap_ and nullPartitionIds_. Also reset column stats. for (int i = 0; i < numClusteringCols_; ++i) { getColumns().get(i).getStats().setNumNulls(0); @@ -734,7 +695,9 @@ public class HdfsTable extends Table { /** * Create HdfsPartition objects corresponding to 'msPartitions' and add them to this * table's partition list. Any partition metadata will be reset and loaded from - * scratch. + * scratch. For each partition created, we load the block metadata for each data file + * under it. We optimize the block metadata loading by grouping together the name node + * requests for all the partitions under the table base directory into a single RPC. * * If there are no partitions in the Hive metadata, a single partition is added with no * partition keys. @@ -745,29 +708,37 @@ public class HdfsTable extends Table { CatalogException { Preconditions.checkNotNull(msTbl); initializePartitionMetadata(msTbl); - // Map of filesystem to the file blocks for new/modified FileDescriptors. Blocks in - // this map will have their disk volume IDs information (re)loaded. This is used to - // speed up the incremental refresh of a table's metadata by skipping unmodified, - // previously loaded blocks. - Map<FsKey, FileBlocksInfo> blocksToLoad = Maps.newHashMap(); + // Map of partition paths to their corresponding HdfsPartition objects. Populated + // using createPartition() calls. A single partition path can correspond to multiple + // partitions. + HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap(); + Path tblLocation = getHdfsBaseDirPath(); + // List of directories that we scan for block locations. We optimize the block metadata + // loading to reduce the number of RPCs to the NN by separately loading partitions + // with default directory paths (under the base table directory) and non-default + // directory paths. For the former we issue a single RPC to the NN to load all the + // blocks from hdfsBaseDir_ and for the latter we load each of the partition directory + // separately. + // TODO: We can still do some advanced optimization by grouping all the partition + // directories under the same ancestor path up the tree. + List<Path> dirsToLoad = Lists.newArrayList(tblLocation); + FileSystem fs = tblLocation.getFileSystem(CONF); if (msTbl.getPartitionKeysSize() == 0) { Preconditions.checkArgument(msPartitions == null || msPartitions.isEmpty()); // This table has no partition key, which means it has no declared partitions. // We model partitions slightly differently to Hive - every file must exist in a // partition, so add a single partition with no keys which will get all the // files in the table's root directory. - HdfsPartition part = createPartition(msTbl.getSd(), null, blocksToLoad); + HdfsPartition part = createPartition(msTbl.getSd(), null); + partsByPath.put(tblLocation, Lists.newArrayList(part)); if (isMarkedCached_) part.markCached(); addPartition(part); - Path location = new Path(hdfsBaseDir_); - FileSystem fs = location.getFileSystem(CONF); - if (fs.exists(location)) { - accessLevel_ = getAvailableAccessLevel(fs, location); + if (fs.exists(tblLocation)) { + accessLevel_ = getAvailableAccessLevel(fs, tblLocation); } } else { for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) { - HdfsPartition partition = createPartition(msPartition.getSd(), msPartition, - blocksToLoad); + HdfsPartition partition = createPartition(msPartition.getSd(), msPartition); addPartition(partition); // If the partition is null, its HDFS path does not exist, and it was not added // to this table's partition list. Skip the partition. @@ -784,11 +755,41 @@ public class HdfsTable extends Table { // WRITE_ONLY the table's access level should be NONE. accessLevel_ = TAccessLevel.READ_ONLY; } + Path partDir = new Path(msPartition.getSd().getLocation()); + List<HdfsPartition> parts = partsByPath.get(partDir); + if (parts == null) { + partsByPath.put(partDir, Lists.newArrayList(partition)); + } else { + parts.add(partition); + } + if (!dirsToLoad.contains(partDir) && + !FileSystemUtil.isDescendantPath(partDir, tblLocation)) { + // This partition has a custom filesystem location. Load its file/block + // metadata separately by adding it to the list of dirs to load. + dirsToLoad.add(partDir); + } } } - loadDiskIds(blocksToLoad); + if (LOG.isTraceEnabled()) LOG.trace("partsByPath size: " + partsByPath.size()); + loadMetadataAndDiskIds(dirsToLoad, partsByPath); } + private void loadMetadataAndDiskIds(HdfsPartition partition) throws CatalogException { + Path partDirPath = partition.getLocationPath(); + HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap(); + partsByPath.put(partDirPath, Lists.newArrayList(partition)); + loadMetadataAndDiskIds(Lists.newArrayList(partDirPath), partsByPath); + } + + /** + * Helper method to load the block locations from each directory in 'locations' + * and filtering only the paths from 'partsByPath'. Also loads the disk IDs + * corresponding to these block locations. + */ + private void loadMetadataAndDiskIds(List<Path> locations, + HashMap<Path, List<HdfsPartition>> partsByPath) { + for (Path location: locations) { loadBlockMetadata(location, partsByPath); } + } /** * Gets the AccessLevel that is available for Impala for this table based on the * permissions Impala has on the given path. If the path does not exist, recurses up @@ -828,31 +829,25 @@ public class HdfsTable extends Table { * Throws CatalogException if the supplied storage descriptor contains metadata that * Impala can't understand. */ - public HdfsPartition createPartition(StorageDescriptor storageDescriptor, + public HdfsPartition createAndLoadPartition(StorageDescriptor storageDescriptor, org.apache.hadoop.hive.metastore.api.Partition msPartition) throws CatalogException { - Map<FsKey, FileBlocksInfo> blocksToLoad = Maps.newHashMap(); - HdfsPartition hdfsPartition = createPartition(storageDescriptor, msPartition, - blocksToLoad); - loadDiskIds(blocksToLoad); + HdfsPartition hdfsPartition = createPartition(storageDescriptor, msPartition); + loadMetadataAndDiskIds(hdfsPartition); return hdfsPartition; } /** * Creates a new HdfsPartition from a specified StorageDescriptor and an HMS partition - * object. It populates 'perFsFileBlock' with the blocks to be loaded for each file in - * the partition directory. + * object. */ private HdfsPartition createPartition(StorageDescriptor storageDescriptor, - org.apache.hadoop.hive.metastore.api.Partition msPartition, - Map<FsKey, FileBlocksInfo> perFsFileBlocks) + org.apache.hadoop.hive.metastore.api.Partition msPartition) throws CatalogException { HdfsStorageDescriptor fileFormatDescriptor = HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor); List<LiteralExpr> keyValues = Lists.newArrayList(); - boolean isMarkedCached = isMarkedCached_; if (msPartition != null) { - isMarkedCached = HdfsCachingUtil.validateCacheParams(msPartition.getParameters()); // Load key values for (String partitionKey: msPartition.getValues()) { Type type = getColumns().get(keyValues.size()).getType(); @@ -869,12 +864,7 @@ public class HdfsTable extends Table { } } } - try { - Expr.analyze(keyValues, null); - } catch (AnalysisException e) { - // should never happen - throw new IllegalStateException(e); - } + for (Expr v: keyValues) v.analyzeNoThrow(null); } Path partDirPath = new Path(storageDescriptor.getLocation()); @@ -882,12 +872,12 @@ public class HdfsTable extends Table { FileSystem fs = partDirPath.getFileSystem(CONF); multipleFileSystems_ = multipleFileSystems_ || !FileSystemUtil.isPathOnFileSystem(new Path(getLocation()), fs); - updatePartitionFds(partDirPath, isMarkedCached, - fileFormatDescriptor.getFileFormat(), perFsFileBlocks); + if (msPartition != null) { + HdfsCachingUtil.validateCacheParams(msPartition.getParameters()); + } HdfsPartition partition = new HdfsPartition(this, msPartition, keyValues, fileFormatDescriptor, - perPartitionFileDescMap_.get(partDirPath.toString()).values(), - getAvailableAccessLevel(fs, partDirPath)); + new ArrayList<FileDescriptor>(), getAvailableAccessLevel(fs, partDirPath)); partition.checkWellFormed(); return partition; } catch (IOException e) { @@ -896,21 +886,6 @@ public class HdfsTable extends Table { } /** - * Add the given THdfsFileBlocks and BlockLocations to the FileBlockInfo for the - * given filesystem. - */ - private void addPerFsFileBlocks(Map<FsKey, FileBlocksInfo> fsToBlocks, FileSystem fs, - List<THdfsFileBlock> blocks, List<BlockLocation> locations) { - FsKey fsKey = new FsKey(fs); - FileBlocksInfo infos = fsToBlocks.get(fsKey); - if (infos == null) { - infos = new FileBlocksInfo(); - fsToBlocks.put(fsKey, infos); - } - infos.addBlocks(blocks, locations); - } - - /** * Adds the partition to the HdfsTable. Throws a CatalogException if the partition * already exists in this table. */ @@ -983,7 +958,6 @@ public class HdfsTable extends Table { partitionIds_.remove(partitionId); partitionMap_.remove(partitionId); nameToPartitionMap_.remove(partition.getPartitionName()); - perPartitionFileDescMap_.remove(partition.getLocation()); for (int i = 0; i < partition.getPartitionValues().size(); ++i) { ColumnStats stats = getColumns().get(i).getStats(); LiteralExpr literal = partition.getPartitionValues().get(i); @@ -1055,8 +1029,7 @@ public class HdfsTable extends Table { * metadata will be updated from the Hive Metastore. * * If 'loadFileMetadata' is true, file metadata of the specified partitions are - * reloaded while reusing existing file descriptors to avoid loading metadata for files - * that haven't changed. If 'partitionsToUpdate' is not specified, file metadata of all + * reloaded from scratch. If 'partitionsToUpdate' is not specified, file metadata of all * the partitions are loaded. * * If 'loadTableSchema' is true, the table schema is loaded from the Hive Metastore. @@ -1148,10 +1121,9 @@ public class HdfsTable extends Table { org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(); Preconditions.checkNotNull(msTbl); addDefaultPartition(msTbl.getSd()); - Map<FsKey, FileBlocksInfo> fileBlocksToLoad = Maps.newHashMap(); - HdfsPartition part = createPartition(msTbl.getSd(), null, fileBlocksToLoad); + HdfsPartition part = createPartition(msTbl.getSd(), null); addPartition(part); - loadDiskIds(fileBlocksToLoad); + loadMetadataAndDiskIds(part); if (isMarkedCached_) part.markCached(); } @@ -1452,10 +1424,8 @@ public class HdfsTable extends Table { msPartitions.addAll(MetaStoreUtil.fetchPartitionsByName(client, Lists.newArrayList(partitionNames), db_.getName(), name_)); - Map<FsKey, FileBlocksInfo> fileBlocksToLoad = Maps.newHashMap(); for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) { - HdfsPartition partition = - createPartition(msPartition.getSd(), msPartition, fileBlocksToLoad); + HdfsPartition partition = createPartition(msPartition.getSd(), msPartition); addPartition(partition); // If the partition is null, its HDFS path does not exist, and it was not added to // this table's partition list. Skip the partition. @@ -1473,8 +1443,8 @@ public class HdfsTable extends Table { // WRITE_ONLY the table's access level should be NONE. accessLevel_ = TAccessLevel.READ_ONLY; } + loadMetadataAndDiskIds(partition); } - loadDiskIds(fileBlocksToLoad); } /** @@ -1491,10 +1461,12 @@ public class HdfsTable extends Table { Preconditions.checkNotNull(msTbl); HdfsStorageDescriptor fileFormatDescriptor = HdfsStorageDescriptor.fromStorageDescriptor(this.name_, msTbl.getSd()); - Map<FsKey, FileBlocksInfo> perFsFileBlocks = Maps.newHashMap(); for (HdfsPartition partition: partitions) { org.apache.hadoop.hive.metastore.api.Partition msPart = partition.toHmsPartition(); + if (msPart != null) { + HdfsCachingUtil.validateCacheParams(msPart.getParameters()); + } StorageDescriptor sd = null; if (msPart == null) { // If this partition is not stored in the Hive Metastore (e.g. default partition @@ -1504,10 +1476,8 @@ public class HdfsTable extends Table { } else { sd = msPart.getSd(); } - loadPartitionFileMetadata(sd, partition, fileFormatDescriptor.getFileFormat(), - perFsFileBlocks); + loadPartitionFileMetadata(sd, partition); } - loadDiskIds(perFsFileBlocks); } /** @@ -1517,82 +1487,19 @@ public class HdfsTable extends Table { * 'perFsFileBlocks' with file block info and updates table metadata. */ private void loadPartitionFileMetadata(StorageDescriptor storageDescriptor, - HdfsPartition partition, HdfsFileFormat fileFormat, - Map<FsKey, FileBlocksInfo> perFsFileBlocks) throws Exception { + HdfsPartition partition) throws Exception { Preconditions.checkNotNull(storageDescriptor); Preconditions.checkNotNull(partition); org.apache.hadoop.hive.metastore.api.Partition msPart = partition.toHmsPartition(); - boolean isMarkedCached = isMarkedCached_; - if (msPart != null) { - isMarkedCached = HdfsCachingUtil.validateCacheParams(msPart.getParameters()); - } Path partDirPath = new Path(storageDescriptor.getLocation()); FileSystem fs = partDirPath.getFileSystem(CONF); if (!fs.exists(partDirPath)) return; - String partitionDir = partDirPath.toString(); numHdfsFiles_ -= partition.getNumFileDescriptors(); totalHdfsBytes_ -= partition.getSize(); Preconditions.checkState(numHdfsFiles_ >= 0 && totalHdfsBytes_ >= 0); - updatePartitionFds(partDirPath, isMarkedCached, fileFormat, perFsFileBlocks); - List<FileDescriptor> fileDescs = Lists.newArrayList( - perPartitionFileDescMap_.get(partDirPath.toString()).values()); - partition.setFileDescriptors(fileDescs); - totalHdfsBytes_ += partition.getSize(); - numHdfsFiles_ += fileDescs.size(); - } - - /** - * Updates the file descriptors of a partition directory specified by 'partitionPath' - * and loads block metadata of new/modified files. Reuses existing FileDescriptors for - * unchanged files (indicated by unchanged mtime). The one exception is if the - * partition is marked as cached (HDFS caching) in which case the block metadata - * cannot be reused. Otherwise, creates new FileDescriptors and adds them to - * perPartitionFileDescMap_. 'fileFomat' is the file format of the files in this - * partition directory. 'perFsFileBlocks' is populated with the loaded block metadata. - */ - private void updatePartitionFds(Path partitionPath, - boolean isMarkedCached, HdfsFileFormat fileFormat, - Map<FsKey, FileBlocksInfo> perFsFileBlocks) throws CatalogException { - Preconditions.checkNotNull(partitionPath); - String partPathStr = partitionPath.toString(); - try { - FileSystem fs = partitionPath.getFileSystem(CONF); - if (!fs.exists(partitionPath)) { - perPartitionFileDescMap_.put( - partPathStr, Maps.<String, FileDescriptor>newHashMap()); - return; - } - Map<String, FileDescriptor> fileDescMap = - perPartitionFileDescMap_.get(partPathStr); - Map<String, FileDescriptor> newFileDescMap = Maps.newHashMap(); - // Get all the files in the partition directory - for (FileStatus fileStatus: fs.listStatus(partitionPath)) { - String fileName = fileStatus.getPath().getName().toString(); - if (fileStatus.isDirectory() || FileSystemUtil.isHiddenFile(fileName) || - HdfsCompression.fromFileName(fileName) == HdfsCompression.LZO_INDEX) { - // Ignore directory, hidden file starting with . or _, and LZO index files - // If a directory is erroneously created as a subdirectory of a partition dir - // we should ignore it and move on. Hive will not recurse into directories. - // Skip index files, these are read by the LZO scanner directly. - continue; - } - FileDescriptor fd = fileDescMap != null ? fileDescMap.get(fileName) : null; - if (fd == null || isMarkedCached || fd.getFileLength() != fileStatus.getLen() - || fd.getModificationTime() != fileStatus.getModificationTime()) { - // Metadata of cached or modified files are not reused. - fd = new FileDescriptor(fileName, fileStatus.getLen(), - fileStatus.getModificationTime()); - loadBlockMetadata(fs, fileStatus, fd, fileFormat, perFsFileBlocks); - } - newFileDescMap.put(fileName, fd); - } - perPartitionFileDescMap_.put(partPathStr, newFileDescMap); - } catch (Exception e) { - throw new CatalogException("Failed to retrieve file descriptors from path " + - partitionPath, e); - } + loadMetadataAndDiskIds(partition); } @Override @@ -1697,6 +1604,7 @@ public class HdfsTable extends Table { public long getNumHdfsFiles() { return numHdfsFiles_; } public long getTotalHdfsBytes() { return totalHdfsBytes_; } public String getHdfsBaseDir() { return hdfsBaseDir_; } + public Path getHdfsBaseDirPath() { return new Path(hdfsBaseDir_); } public boolean isAvroTable() { return avroSchema_ != null; } /** @@ -2022,7 +1930,7 @@ public class HdfsTable extends Table { */ public void reloadPartition(HdfsPartition oldPartition, Partition hmsPartition) throws CatalogException { - HdfsPartition refreshedPartition = createPartition( + HdfsPartition refreshedPartition = createAndLoadPartition( hmsPartition.getSd(), hmsPartition); Preconditions.checkArgument(oldPartition == null || oldPartition.compareTo(refreshedPartition) == 0); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6662c553/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java index 71eea88..4767837 100644 --- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.protocol.EncryptionZone; +import org.apache.impala.catalog.HdfsCompression; import org.apache.log4j.Logger; import com.google.common.base.Preconditions; @@ -280,12 +281,25 @@ public class FileSystemUtil { } /** - * Returns true if the filesystem might override getFileBlockLocations(). + * Returns true if the file corresponding to 'fileStatus' is a valid data file as + * per Impala's partitioning rules. A fileStatus is considered invalid if its a + * directory/hidden file/LZO index file. LZO index files are skipped because they are + * read by the scanner directly. Currently Impala doesn't allow subdirectories in the + * partition paths. */ - public static boolean hasGetFileBlockLocations(FileSystem fs) { + public static boolean isValidDataFile(FileStatus fileStatus) { + String fileName = fileStatus.getPath().getName(); + return !(fileStatus.isDirectory() || FileSystemUtil.isHiddenFile(fileName) || + HdfsCompression.fromFileName(fileName) == HdfsCompression.LZO_INDEX); + } + + /** + * Returns true if the filesystem supports storage UUIDs in BlockLocation calls. + */ + public static boolean supportsStorageIds(FileSystem fs) { // Common case. if (isDistributedFileSystem(fs)) return true; - // Blacklist FileSystems that are known to not implement getFileBlockLocations(). + // Blacklist FileSystems that are known to not to include storage UUIDs. return !(fs instanceof S3AFileSystem || fs instanceof LocalFileSystem); } @@ -409,6 +423,16 @@ public class FileSystemUtil { } /** + * Returns true if Path 'p' is a descendant of Path 'parent', false otherwise. + */ + public static boolean isDescendantPath(Path p, Path parent) { + if (p == null || parent == null) return false; + while (!p.isRoot() && p.depth() != parent.depth()) p = p.getParent(); + if (p.isRoot()) return false; + return p.equals(parent); + } + + /** * Returns the configuration. */ public static Configuration getConfiguration() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6662c553/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 4087818..f878b12 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -602,7 +602,8 @@ public class CatalogOpExecutor { throw new CatalogException("Table " + tbl.getFullName() + " is not an HDFS table"); } HdfsTable hdfsTable = (HdfsTable) tbl; - HdfsPartition hdfsPartition = hdfsTable.createPartition(partition.getSd(), partition); + HdfsPartition hdfsPartition = + hdfsTable.createAndLoadPartition(partition.getSd(), partition); return catalog_.addPartition(hdfsPartition); }
