IMPALA-7140 (part 5): support fetching file info for FS tables This adds support for fetching file information and creating file descriptors.
With this patch, I'm able to connect and run queries. Most planner tests still fail because of missing column stats resulting in different join orders compared to the existing implementation. Change-Id: I42d67ab754872fad094c7dacdd2e1182de1bf3e8 Reviewed-on: http://gerrit.cloudera.org:8080/10749 Reviewed-by: Vuk Ercegovac <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/64e67198 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/64e67198 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/64e67198 Branch: refs/heads/master Commit: 64e6719870db5602a6fa85014bc6c264080b9414 Parents: 011acd0 Author: Todd Lipcon <[email protected]> Authored: Fri Jun 15 15:34:30 2018 -0700 Committer: Todd Lipcon <[email protected]> Committed: Tue Jun 26 04:27:33 2018 +0000 ---------------------------------------------------------------------- .../org/apache/impala/catalog/FeFsTable.java | 5 -- .../apache/impala/catalog/HdfsPartition.java | 4 +- .../org/apache/impala/catalog/HdfsTable.java | 68 ++++++++++----- .../catalog/local/DirectMetaProvider.java | 19 +++++ .../impala/catalog/local/LocalFsPartition.java | 90 +++++++++++++++++--- .../impala/catalog/local/LocalFsTable.java | 25 +++--- .../impala/catalog/local/MetaProvider.java | 9 ++ .../impala/catalog/local/LocalCatalogTest.java | 18 ++++ 8 files changed, 188 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/64e67198/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java index 508a0e5..73a7ffe 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java @@ -90,11 +90,6 @@ public interface FeFsTable extends FeTable { long getTotalHdfsBytes(); /** - * @return the total number of files stored on HDFS for this table - */ - long getTotalNumFiles(); - - /** * @return true if this table is backed by the Avro file format */ boolean isAvroTable(); http://git-wip-us.apache.org/repos/asf/impala/blob/64e67198/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index ff40713..f5e0654 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -127,10 +127,8 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition { /** * Creates the file descriptor of a file represented by 'fileStatus' that * resides in a filesystem that doesn't support the BlockLocation API (e.g. S3). - * 'fileFormat' is the file format of the partition where this file resides. */ - public static FileDescriptor createWithNoBlocks( - FileStatus fileStatus, HdfsFileFormat fileFormat) { + public static FileDescriptor createWithNoBlocks(FileStatus fileStatus) { FlatBufferBuilder fbb = new FlatBufferBuilder(1); return new FileDescriptor(createFbFileDesc(fbb, fileStatus, null, false)); } http://git-wip-us.apache.org/repos/asf/impala/blob/64e67198/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 1f20cc8..69ecb6c 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -285,7 +285,7 @@ public class HdfsTable extends Table implements FeFsTable { BackendConfig.INSTANCE.maxNonHdfsPartsParallelLoad(); // File/Block metadata loading stats for a single HDFS path. - private class FileMetadataLoadStats { + public static class FileMetadataLoadStats { // Path corresponding to this metadata load request. private final Path hdfsPath; @@ -410,10 +410,40 @@ public class HdfsTable extends Table implements FeFsTable { } FileSystem fs = partDir.getFileSystem(CONF); - boolean supportsBlocks = FileSystemUtil.supportsStorageIds(fs); + RemoteIterator<LocatedFileStatus> fileStatusIter = FileSystemUtil.listFiles(fs, partDir, false); if (fileStatusIter == null) return loadStats; + + List<FileDescriptor> newFileDescs = createFileDescriptors( + fs, fileStatusIter, hostIndex_, loadStats); + for (HdfsPartition partition: partitions) { + partition.setFileDescriptors(newFileDescs); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Loaded file metadata for " + getFullName() + " " + + loadStats.debugString()); + } + return loadStats; + } + + /** + * Convert LocatedFileStatuses to FileDescriptors. + * + * If 'fs' is a FileSystem that supports block locations, the resulting + * descriptors include location information, and 'hostIndex' is updated + * to include all of the hosts referred to by the locations. + * + * 'loadStats' is updated to reflect this loading operation. + * + * May throw IOException if the provided RemoteIterator throws. + */ + public static List<FileDescriptor> createFileDescriptors( + FileSystem fs, + RemoteIterator<LocatedFileStatus> fileStatusIter, + ListMap<TNetworkAddress> hostIndex, + FileMetadataLoadStats loadStats) throws IOException { + boolean supportsBlocks = FileSystemUtil.supportsStorageIds(fs); Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0)); List<FileDescriptor> newFileDescs = Lists.newArrayList(); while (fileStatusIter.hasNext()) { @@ -425,21 +455,15 @@ public class HdfsTable extends Table implements FeFsTable { FileDescriptor fd; if (supportsBlocks) { fd = FileDescriptor.create(fileStatus, fileStatus.getBlockLocations(), fs, - hostIndex_, HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds); + hostIndex, HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds); } else { - fd = FileDescriptor.createWithNoBlocks( - fileStatus, partitions.get(0).getFileFormat()); + fd = FileDescriptor.createWithNoBlocks(fileStatus); } newFileDescs.add(fd); ++loadStats.loadedFiles; } - for (HdfsPartition partition: partitions) partition.setFileDescriptors(newFileDescs); loadStats.unknownDiskIds += numUnknownDiskIds.getRef(); - if (LOG.isTraceEnabled()) { - LOG.trace("Loaded file metadata for " + getFullName() + " " + - loadStats.debugString()); - } - return loadStats; + return newFileDescs; } /** @@ -499,8 +523,7 @@ public class HdfsTable extends Table implements FeFsTable { fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_, HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds); } else { - fd = FileDescriptor.createWithNoBlocks( - fileStatus, partitions.get(0).getFileFormat()); + fd = FileDescriptor.createWithNoBlocks(fileStatus); } ++loadStats.loadedFiles; } else { @@ -1756,9 +1779,6 @@ public class HdfsTable extends Table implements FeFsTable { public long getTotalHdfsBytes() { return fileMetadataStats_.totalFileBytes; } @Override // FeFsTable - public long getTotalNumFiles() { return fileMetadataStats_.numFiles; } - - @Override // FeFsTable public String getHdfsBaseDir() { return hdfsBaseDir_; } public Path getHdfsBaseDirPath() { return new Path(hdfsBaseDir_); } @Override // FeFsTable @@ -1988,7 +2008,14 @@ public class HdfsTable extends Table implements FeFsTable { Collections.sort(orderedPartitions, HdfsPartition.KV_COMPARATOR); long totalCachedBytes = 0L; + long totalBytes = 0L; + long totalNumFiles = 0L; for (FeFsPartition p: orderedPartitions) { + int numFiles = p.getFileDescriptors().size(); + long size = p.getSize(); + totalNumFiles += numFiles; + totalBytes += size; + TResultRowBuilder rowBuilder = new TResultRowBuilder(); // Add the partition-key values (as strings for simplicity). @@ -2001,8 +2028,9 @@ public class HdfsTable extends Table implements FeFsTable { // Compute and report the extrapolated row count because the set of files could // have changed since we last computed stats for this partition. We also follow // this policy during scan-cardinality estimation. - if (statsExtrap) rowBuilder.add(table.getExtrapolatedNumRows(p.getSize())); - rowBuilder.add(p.getFileDescriptors().size()).addBytes(p.getSize()); + if (statsExtrap) rowBuilder.add(table.getExtrapolatedNumRows(size)); + + rowBuilder.add(numFiles).addBytes(size); if (!p.isMarkedCached()) { // Helps to differentiate partitions that have 0B cached versus partitions // that are not marked as cached. @@ -2056,8 +2084,8 @@ public class HdfsTable extends Table implements FeFsTable { if (statsExtrap) { rowBuilder.add(table.getExtrapolatedNumRows(table.getTotalHdfsBytes())); } - rowBuilder.add(table.getTotalNumFiles()) - .addBytes(table.getTotalHdfsBytes()) + rowBuilder.add(totalNumFiles) + .addBytes(totalBytes) .addBytes(totalCachedBytes).add("").add("").add("").add(""); result.addToRows(rowBuilder.get()); } http://git-wip-us.apache.org/repos/asf/impala/blob/64e67198/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java index e09f031..aadbf23 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java @@ -17,10 +17,16 @@ package org.apache.impala.catalog.local; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -46,6 +52,7 @@ import com.google.common.collect.Maps; */ class DirectMetaProvider implements MetaProvider { private static MetaStoreClientPool msClientPool_; + private static Configuration CONF = new Configuration(); DirectMetaProvider() { initMsClientPool(); @@ -152,4 +159,16 @@ class DirectMetaProvider implements MetaProvider { return ret; } + + @Override + public List<LocatedFileStatus> loadFileMetadata(Path dir) throws IOException { + Preconditions.checkNotNull(dir); + Preconditions.checkArgument(dir.isAbsolute(), + "Must pass absolute path: %s", dir); + FileSystem fs = dir.getFileSystem(CONF); + RemoteIterator<LocatedFileStatus> it = fs.listFiles(dir, /*recursive=*/false); + ImmutableList.Builder<LocatedFileStatus> b = new ImmutableList.Builder<>(); + while (it.hasNext()) b.add(it.next()); + return b.build(); + } } http://git-wip-us.apache.org/repos/asf/impala/blob/64e67198/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java index effe7c2..c5510d5 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java @@ -17,11 +17,17 @@ package org.apache.impala.catalog.local; -import java.util.Collections; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.impala.analysis.LiteralExpr; import org.apache.impala.catalog.FeCatalogUtils; @@ -32,18 +38,22 @@ import org.apache.impala.catalog.HdfsPartition; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.catalog.HdfsStorageDescriptor; import org.apache.impala.catalog.HdfsStorageDescriptor.InvalidStorageDescriptorException; +import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.PartitionStatsUtil; import org.apache.impala.thrift.TAccessLevel; import org.apache.impala.thrift.THdfsPartitionLocation; import org.apache.impala.thrift.TPartitionStats; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; public class LocalFsPartition implements FeFsPartition { + private static final Configuration CONF = new Configuration(); private final LocalFsTable table_; private final LocalPartitionSpec spec_; private final Partition msPartition_; + private ImmutableList<FileDescriptor> fileDescriptors_; public LocalFsPartition(LocalFsTable table, LocalPartitionSpec spec, Partition msPartition) { @@ -69,22 +79,20 @@ public class LocalFsPartition implements FeFsPartition { @Override public List<FileDescriptor> getFileDescriptors() { - // TODO(todd): implement me - return Collections.emptyList(); + loadFileDescriptors(); + return fileDescriptors_; } @Override public boolean hasFileDescriptors() { - // TODO(todd): implement file fetching. Return true for now - // so that partition pruning can be tested -- if we return false - // then all partitions would be pruned. - return true; + loadFileDescriptors(); + return !fileDescriptors_.isEmpty(); } @Override public int getNumFileDescriptors() { - // TODO Auto-generated method stub - return 0; + loadFileDescriptors(); + return fileDescriptors_.size(); } @Override @@ -161,8 +169,12 @@ public class LocalFsPartition implements FeFsPartition { @Override public long getSize() { - // TODO Auto-generated method stub - return 0; + loadFileDescriptors(); + long size = 0; + for (FileDescriptor fd : fileDescriptors_) { + size += fd.getFileLength(); + } + return size; } @Override @@ -203,4 +215,60 @@ public class LocalFsPartition implements FeFsPartition { return Maps.filterKeys(getParameters(), HdfsPartition.IS_NOT_INCREMENTAL_STATS_KEY); } + + + private void loadFileDescriptors() { + if (fileDescriptors_ != null) return; + Path partDir = getLocationPath(); + List<LocatedFileStatus> stats; + try { + stats = table_.db_.getCatalog().getMetaProvider().loadFileMetadata(partDir); + } catch (FileNotFoundException fnf) { + // If the partition directory isn't found, this is treated as having no + // files. + fileDescriptors_ = ImmutableList.of(); + return; + } catch (IOException ioe) { + throw new LocalCatalogException(String.format( + "Could not load files for partition %s of table %s", + spec_.getName(), table_.getFullName()), ioe); + } + + HdfsTable.FileMetadataLoadStats loadStats = + new HdfsTable.FileMetadataLoadStats(partDir); + + try { + FileSystem fs = partDir.getFileSystem(CONF); + fileDescriptors_ = ImmutableList.copyOf( + HdfsTable.createFileDescriptors(fs, new FakeRemoteIterator<>(stats), + table_.getHostIndex(), loadStats)); + } catch (IOException e) { + throw new LocalCatalogException(String.format( + "Could not convert files to descriptors for partition %s of table %s", + spec_.getName(), table_.getFullName()), e); + } + } + + /** + * Wrapper for a normal Iterable<T> to appear like a Hadoop RemoteIterator<T>. + * This is necessary because the existing code to convert file statuses to + * descriptors consumes the remote iterator directly and thus avoids materializing + * all of the LocatedFileStatus objects in memory at the same time. + */ + private static class FakeRemoteIterator<T> implements RemoteIterator<T> { + private final Iterator<T> it_; + + FakeRemoteIterator(Iterable<T> it) { + this.it_ = it.iterator(); + } + @Override + public boolean hasNext() throws IOException { + return it_.hasNext(); + } + + @Override + public T next() throws IOException { + return it_.next(); + } + } } http://git-wip-us.apache.org/repos/asf/impala/blob/64e67198/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java index f648ac8..6871f90 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java @@ -78,6 +78,12 @@ public class LocalFsTable extends LocalTable implements FeFsTable { */ private ArrayList<HashSet<Long>> nullPartitionIds_; + /** + * Map assigning integer indexes for the hosts containing blocks for this table. + * This is updated as a side effect of LocalFsPartition.loadFileDescriptors(). + */ + private final ListMap<TNetworkAddress> hostIndex_ = new ListMap<>(); + public LocalFsTable(LocalDb db, String tblName, SchemaInfo schemaInfo) { super(db, tblName, schemaInfo); } @@ -113,7 +119,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable { @Override public TResultSet getFiles(List<List<TPartitionKeyValue>> partitionSet) throws CatalogException { - // TODO(todd): implement fetching files from HDFS + // TODO(todd): implement for SHOW FILES. return null; } @@ -125,14 +131,12 @@ public class LocalFsTable extends LocalTable implements FeFsTable { @Override public long getTotalHdfsBytes() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public long getTotalNumFiles() { - // TODO Auto-generated method stub - return 0; + // TODO(todd): this is slow because it requires loading all partitions. Remove if possible. + long size = 0; + for (FeFsPartition p: loadPartitions(getPartitionIds())) { + size += p.getSize(); + } + return size; } @Override @@ -387,7 +391,6 @@ public class LocalFsTable extends LocalTable implements FeFsTable { @Override public ListMap<TNetworkAddress> getHostIndex() { - // TODO(todd): implement me - return new ListMap<>(); + return hostIndex_; } } http://git-wip-us.apache.org/repos/asf/impala/blob/64e67198/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java index 42bf40e..f59a7c9 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java @@ -17,9 +17,12 @@ package org.apache.impala.catalog.local; +import java.io.IOException; import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -65,4 +68,10 @@ interface MetaProvider { Map<String, Partition> loadPartitionsByNames(String dbName, String tableName, List<String> partitionColumnNames, List<String> partitionNames) throws MetaException, TException; + + /** + * Load file metadata and block locations for the files in the given + * partition directory. + */ + List<LocatedFileStatus> loadFileMetadata(Path dir) throws IOException; } http://git-wip-us.apache.org/repos/asf/impala/blob/64e67198/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java index aabb873..a242b1c 100644 --- a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java @@ -19,6 +19,7 @@ package org.apache.impala.catalog.local; import static org.junit.Assert.*; +import java.util.List; import java.util.Set; import org.apache.impala.catalog.CatalogTest; @@ -27,6 +28,7 @@ import org.apache.impala.catalog.FeDb; import org.apache.impala.catalog.FeFsPartition; import org.apache.impala.catalog.FeFsTable; import org.apache.impala.catalog.FeTable; +import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.catalog.Type; import org.apache.impala.thrift.TResultSet; import org.apache.impala.util.MetaStoreUtil; @@ -131,4 +133,20 @@ public class LocalCatalogTest { t, Iterables.getOnlyElement(ids)); assertTrue(partition.getPartitionValue(dayCol).isNullLiteral()); } + + @Test + public void testLoadFileDescriptors() throws Exception { + FeFsTable t = (FeFsTable) catalog_.getTable("functional", "alltypes"); + int totalFds = 0; + for (FeFsPartition p: FeCatalogUtils.loadAllPartitions(t)) { + List<FileDescriptor> fds = p.getFileDescriptors(); + totalFds += fds.size(); + for (FileDescriptor fd : fds) { + assertTrue(fd.getFileLength() > 0); + assertEquals(fd.getNumFileBlocks(), 1); + assertEquals(3, fd.getFbFileBlock(0).diskIdsLength()); + } + } + assertEquals(24, totalFds); + } }
