This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 2fd795cf56e65a43087375867dcc9890e3a27330 Author: Yongzhi Chen <yc...@cloudera.com> AuthorDate: Fri Apr 5 09:12:29 2019 -0400 IMPALA-7322: Add storage wait time to profile Add metrics to record storage wait time for operations with metadata load in catalog for hdfs, kudu and hbase tables. Pass storage wait time from catalog to fe through thrift and log total storage load time in query profile. Storage-load-time is the amount of time spent loading metadata from the underlying storage layer (e.g. S3, HDFS, Kudu, HBase), which does not include the amount of time spending loading data from HMS. Testing: Ran queries that can trigger all of, none of or some of the related tables loading. Check query profile for each query. Check catalog metrics for each table. Add unit tests to test_observability.py Ran all core tests. Sample output: Profile:(storage-load-time is the added property): After ran a hbase query (Metadata load finished is divided into several lines because of limitation of commit message): Query Compilation: 4s401ms - Metadata load started: 661.084us (661.084us) - Metadata load finished. loaded-tables=1/1 load-requests=1 catalog-updates=3 storage-load-time=233ms: 3s819ms (3s819ms) - Analysis finished: 3s820ms (763.979us) - Value transfer graph computed: 3s820ms (63.193us) Catalog metrics(this sample is from a hdfs table): storage-metadata-load-duration: Count: 1 Mean rate: 0.0085 1 min. rate: 0.032 5 min. rate: 0.1386 15 min. rate: 0.177 Min (msec): 111 Max (msec): 111 Mean (msec): 111.1802 Median (msec): 111.1802 75th-% (msec): 111.1802 95th-% (msec): 111.1802 99th-% (msec): 111.1802 Change-Id: I6dde7e394b7c1c396d835ef6aa0a55930c0a8660 Reviewed-on: http://gerrit.cloudera.org:8080/12940 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- common/thrift/CatalogObjects.thrift | 4 ++ .../apache/impala/analysis/StmtMetadataLoader.java | 22 ++++++- .../java/org/apache/impala/catalog/HBaseTable.java | 17 ++++-- .../java/org/apache/impala/catalog/HdfsTable.java | 67 ++++++++++++++++------ .../java/org/apache/impala/catalog/KuduTable.java | 9 ++- .../main/java/org/apache/impala/catalog/Table.java | 12 ++++ tests/query_test/test_observability.py | 29 ++++++++++ 7 files changed, 131 insertions(+), 29 deletions(-) diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index 01bba1d..1152ecf 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -490,6 +490,10 @@ struct TTable { // For example ValidReaderWriteIdList object's format is: // <table_name>:<highwatermark>:<minOpenWriteId>:<open_writeids>:<abort_writeids> 14: optional string valid_write_ids + + // Set if this table needs storage access during metadata load. + // Time used for storage loading in nanoseconds. + 15: optional i64 storage_metadata_load_time } // Represents a database. diff --git a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java index fd11af1..bfe041b 100644 --- a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java @@ -18,6 +18,7 @@ package org.apache.impala.analysis; import java.util.ArrayList; +import java.util.concurrent.TimeUnit; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -28,6 +29,7 @@ import org.apache.impala.catalog.FeCatalog; import org.apache.impala.catalog.FeDb; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.FeView; +import org.apache.impala.catalog.Table; import org.apache.impala.common.InternalException; import org.apache.impala.compat.MetastoreShim; import org.apache.impala.service.Frontend; @@ -224,11 +226,25 @@ public class StmtMetadataLoader { missingTbls = newMissingTbls; ++numCatalogUpdatesReceived_; } + if (timeline_ != null) { - timeline_.markEvent(String.format("Metadata load finished. " + - "loaded-tables=%d/%d load-requests=%d catalog-updates=%d", + long storageLoadTimeNano = 0; + // Calculate the total storage loading time for this query (not including + // the tables already loaded before the query was called). + storageLoadTimeNano = + loadedTbls_.values() + .stream() + .filter(Table.class::isInstance) + .map(Table.class::cast) + .filter(loadedTbl -> requestedTbls.contains(loadedTbl.getTableName())) + .mapToLong(Table::getStorageLoadTime) + .sum(); + timeline_.markEvent(String.format("Metadata load finished. " + + "loaded-tables=%d/%d load-requests=%d catalog-updates=%d " + + "storage-load-time=%dms", requestedTbls.size(), loadedTbls_.size(), numLoadRequestsSent_, - numCatalogUpdatesReceived_)); + numCatalogUpdatesReceived_, + TimeUnit.MILLISECONDS.convert(storageLoadTimeNano, TimeUnit.NANOSECONDS))); if (MetastoreShim.getMajorVersion() > 2) { StringBuilder validIdsBuf = new StringBuilder("Loaded ValidWriteIdLists: "); diff --git a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java index d75afa7..6810be9 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java @@ -103,11 +103,18 @@ public class HBaseTable extends Table implements FeHBaseTable { Preconditions.checkNotNull(getMetaStoreTable()); try (Timer.Context timer = getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time()) { msTable_ = msTbl; - hbaseTableName_ = Util.getHBaseTableName(getMetaStoreTable()); - // Warm up the connection and verify the table exists. - Util.getHBaseTable(hbaseTableName_).close(); - columnFamilies_ = null; - List<Column> cols = Util.loadColumns(msTable_); + final Timer.Context storageLoadTimer = + getMetrics().getTimer(Table.STORAGE_METADATA_LOAD_DURATION_METRIC).time(); + List<Column> cols; + try { + hbaseTableName_ = Util.getHBaseTableName(getMetaStoreTable()); + // Warm up the connection and verify the table exists. + Util.getHBaseTable(hbaseTableName_).close(); + columnFamilies_ = null; + cols = Util.loadColumns(msTable_); + } finally { + storageMetadataLoadTime_ = storageLoadTimer.stop(); + } clearColumns(); for (Column col : cols) addColumn(col); // Set table stats. diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 779a96e..08ab6e5 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; @@ -87,6 +88,7 @@ import org.apache.impala.util.ThreadNameAnnotator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.codahale.metrics.Clock; import com.codahale.metrics.Gauge; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; @@ -504,16 +506,18 @@ public class HdfsTable extends Table implements FeFsTable { * Create HdfsPartition objects corresponding to 'msPartitions' and add them to this * table's partition list. Any partition metadata will be reset and loaded from * scratch. For each partition created, we load the block metadata for each data file - * under it. + * under it. Returns time spent loading the filesystem metadata in nanoseconds. * * If there are no partitions in the Hive metadata, a single partition is added with no * partition keys. */ - private void loadAllPartitions( + private long loadAllPartitions( List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions, org.apache.hadoop.hive.metastore.api.Table msTbl) throws IOException, CatalogException { Preconditions.checkNotNull(msTbl); + final Clock clock = Clock.defaultClock(); + long startTime = clock.getTick(); initializePartitionMetadata(msTbl); FsPermissionCache permCache = preloadPermissionsCache(msPartitions); @@ -541,6 +545,7 @@ public class HdfsTable extends Table implements FeFsTable { } // Load the file metadata from scratch. loadFileMetadataForPartitions(partitionMap_.values(), /*isRefresh=*/false); + return clock.getTick() - startTime; } @@ -926,7 +931,10 @@ public class HdfsTable extends Table implements FeFsTable { loadTableSchema ? "table definition and " : "", partitionsToUpdate == null ? "all" : String.valueOf(partitionsToUpdate.size()), msTbl.getDbName(), msTbl.getTableName(), reason); - LOG.info(annotation);; + LOG.info(annotation); + final Timer storageLdTimer = + getMetrics().getTimer(Table.STORAGE_METADATA_LOAD_DURATION_METRIC); + storageMetadataLoadTime_ = 0; try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) { // turn all exceptions into TableLoadingException msTable_ = msTbl; @@ -940,16 +948,19 @@ public class HdfsTable extends Table implements FeFsTable { //TODO writeIDs may also be loaded in other code paths. loadValidWriteIdList(client); } + // Load partition and file metadata if (reuseMetadata) { // Incrementally update this table's partitions and file metadata Preconditions.checkState( partitionsToUpdate == null || loadParitionFileMetadata); - updateMdFromHmsTable(msTbl); + storageMetadataLoadTime_ += updateMdFromHmsTable(msTbl); if (msTbl.getPartitionKeysSize() == 0) { - if (loadParitionFileMetadata) updateUnpartitionedTableFileMd(); + if (loadParitionFileMetadata) { + storageMetadataLoadTime_ += updateUnpartitionedTableFileMd(); + } } else { - updatePartitionsFromHms( + storageMetadataLoadTime_ += updatePartitionsFromHms( client, partitionsToUpdate, loadParitionFileMetadata); } LOG.info("Incrementally loaded table metadata for: " + getFullName()); @@ -960,7 +971,7 @@ public class HdfsTable extends Table implements FeFsTable { MetaStoreUtil.fetchAllPartitions( client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES); LOG.info("Fetched partition metadata from the Metastore: " + getFullName()); - loadAllPartitions(msPartitions, msTbl); + storageMetadataLoadTime_ = loadAllPartitions(msPartitions, msTbl); } if (loadTableSchema) setAvroSchema(client, msTbl); setTableStats(msTbl); @@ -973,37 +984,47 @@ public class HdfsTable extends Table implements FeFsTable { + getFullName(), e); } } finally { + storageLdTimer.update(storageMetadataLoadTime_, TimeUnit.NANOSECONDS); context.stop(); } } /** * Updates the table metadata, including 'hdfsBaseDir_', 'isMarkedCached_', - * and 'accessLevel_' from 'msTbl'. Throws an IOException if there was an error - * accessing the table location path. + * and 'accessLevel_' from 'msTbl'. Returns time spent accessing file system + * in nanoseconds. Throws an IOException if there was an error accessing + * the table location path. */ - private void updateMdFromHmsTable(org.apache.hadoop.hive.metastore.api.Table msTbl) + private long updateMdFromHmsTable(org.apache.hadoop.hive.metastore.api.Table msTbl) throws IOException { Preconditions.checkNotNull(msTbl); + final Clock clock = Clock.defaultClock(); + long filesystemAccessTime = 0; + long startTime = clock.getTick(); hdfsBaseDir_ = msTbl.getSd().getLocation(); isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters()); Path location = new Path(hdfsBaseDir_); accessLevel_ = getAvailableAccessLevel(getFullName(), location, new FsPermissionCache()); + filesystemAccessTime = clock.getTick() - startTime; setMetaStoreTable(msTbl); + return filesystemAccessTime; } /** * Incrementally updates the file metadata of an unpartitioned HdfsTable. + * Returns time spent updating the file metadata in nanoseconds. * * This is optimized for the case where few files have changed. See * {@link #refreshFileMetadata(Path, List)} above for details. */ - private void updateUnpartitionedTableFileMd() throws CatalogException { + private long updateUnpartitionedTableFileMd() throws CatalogException { Preconditions.checkState(getNumClusteringCols() == 0); if (LOG.isTraceEnabled()) { LOG.trace("update unpartitioned table: " + getFullName()); } + final Clock clock = Clock.defaultClock(); + long startTime = clock.getTick(); HdfsPartition oldPartition = Iterables.getOnlyElement(partitionMap_.values()); // Instead of updating the existing partition in place, we create a new one @@ -1021,18 +1042,21 @@ public class HdfsTable extends Table implements FeFsTable { addPartition(part); if (isMarkedCached_) part.markCached(); loadFileMetadataForPartitions(ImmutableList.of(part), /*isRefresh=*/true); + return clock.getTick() - startTime; } /** - * Updates the partitions of an HdfsTable so that they are in sync with the Hive - * Metastore. It reloads partitions that were marked 'dirty' by doing a DROP + CREATE. - * It removes from this table partitions that no longer exist in the Hive Metastore and - * adds partitions that were added externally (e.g. using Hive) to the Hive Metastore - * but do not exist in this table. If 'loadParitionFileMetadata' is true, it triggers - * file/block metadata reload for the partitions specified in 'partitionsToUpdate', if - * any, or for all the table partitions if 'partitionsToUpdate' is null. + * Updates the partitions of an HdfsTable so that they are in sync with the + * Hive Metastore. It reloads partitions that were marked 'dirty' by doing a + * DROP + CREATE. It removes from this table partitions that no longer exist + * in the Hive Metastore and adds partitions that were added externally (e.g. + * using Hive) to the Hive Metastore but do not exist in this table. If + * 'loadParitionFileMetadata' is true, it triggers file/block metadata reload + * for the partitions specified in 'partitionsToUpdate', if any, or for all + * the table partitions if 'partitionsToUpdate' is null. Returns time + * spent loading file metadata in nanoseconds. */ - private void updatePartitionsFromHms(IMetaStoreClient client, + private long updatePartitionsFromHms(IMetaStoreClient client, Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata) throws Exception { if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + getFullName()); @@ -1096,14 +1120,19 @@ public class HdfsTable extends Table implements FeFsTable { // Load file metadata. Until we have a notification mechanism for when a // file changes in hdfs, it is sometimes required to reload all the file // descriptors and block metadata of a table (e.g. REFRESH statement). + long fileLoadMdTime = 0; if (loadPartitionFileMetadata) { + final Clock clock = Clock.defaultClock(); + long startTime = clock.getTick(); if (partitionsToUpdate != null) { Preconditions.checkState(partitionsToLoadFiles.isEmpty()); // Only reload file metadata of partitions specified in 'partitionsToUpdate' partitionsToLoadFiles = getPartitionsForNames(partitionsToUpdate); } loadFileMetadataForPartitions(partitionsToLoadFiles, /* isRefresh=*/true); + fileLoadMdTime = clock.getTick() - startTime; } + return fileLoadMdTime; } /** diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java index 338c979..21feda7 100644 --- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java @@ -310,14 +310,19 @@ public class KuduTable extends Table implements FeKuduTable { " property found for Kudu table " + kuduTableName_); } setTableStats(msTable_); - // Load metadata from Kudu and HMS + // Load metadata from Kudu + final Timer.Context ctxStorageLdTime = + getMetrics().getTimer(Table.STORAGE_METADATA_LOAD_DURATION_METRIC).time(); try { loadSchemaFromKudu(); - loadAllColumnStats(msClient); } catch (ImpalaRuntimeException e) { throw new TableLoadingException("Error loading metadata for Kudu table " + kuduTableName_, e); + } finally { + storageMetadataLoadTime_ = ctxStorageLdTime.stop(); } + // Load from HMS + loadAllColumnStats(msClient); refreshLastUsedTime(); // Avoid updating HMS if the schema didn't change. if (msTable_.equals(msTbl)) return; diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index b0264c2..acab108 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -115,6 +115,9 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { // True if this object is stored in an Impalad catalog cache. protected boolean storedInImpaladCatalogCache_ = false; + // Time spent in the source systems loading/reloading the fs metadata for the table. + protected long storageMetadataLoadTime_ = 0; + // Last used time of this table in nanoseconds as returned by // CatalogdTableInvalidator.nanoTime(). This is only set in catalogd and not used by // impalad. @@ -138,6 +141,8 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { public static final String REFRESH_DURATION_METRIC = "refresh-duration"; public static final String ALTER_DURATION_METRIC = "alter-duration"; public static final String LOAD_DURATION_METRIC = "load-duration"; + public static final String STORAGE_METADATA_LOAD_DURATION_METRIC = + "storage-metadata-load-duration"; // Table property key for storing the time of the last DDL operation. public static final String TBL_PROP_LAST_DDL_TIME = "transient_lastDdlTime"; @@ -197,10 +202,14 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { metrics_.addTimer(REFRESH_DURATION_METRIC); metrics_.addTimer(ALTER_DURATION_METRIC); metrics_.addTimer(LOAD_DURATION_METRIC); + metrics_.addTimer(STORAGE_METADATA_LOAD_DURATION_METRIC); } public Metrics getMetrics() { return metrics_; } + // Returns storage wait time during metadata load. + public long getStorageLoadTime() { return storageMetadataLoadTime_; } + // Returns true if this table reference comes from the impalad catalog cache or if it // is loaded from the testing framework. Returns false if this table reference points // to a table stored in the catalog server. @@ -399,6 +408,8 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { accessLevel_ = thriftTable.isSetAccess_level() ? thriftTable.getAccess_level() : TAccessLevel.READ_WRITE; + storageMetadataLoadTime_ = thriftTable.getStorage_metadata_load_time(); + storedInImpaladCatalogCache_ = true; validWriteIds_ = thriftTable.isSetValid_write_ids() ? thriftTable.getValid_write_ids() : null; @@ -435,6 +446,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { TTable table = new TTable(db_.getName(), name_); table.setAccess_level(accessLevel_); + table.setStorage_metadata_load_time(storageMetadataLoadTime_); // Populate both regular columns and clustering columns (if there are any). table.setColumns(new ArrayList<>()); diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py index fef2fef..62e1aeb 100644 --- a/tests/query_test/test_observability.py +++ b/tests/query_test/test_observability.py @@ -585,3 +585,32 @@ class TestObservability(ImpalaTestSuite): query = "select count (*) from functional.alltypes" runtime_profile = self.execute_query(query).runtime_profile self.__verify_profile_event_sequence(event_regexes, runtime_profile) + + def test_query_profile_storage_load_time_filesystem(self): + """Test that when a query needs load metadata for table(s), the + storage load time should be in the profile. Tests file systems.""" + self.__check_query_profile_storage_load_time("functional") + + @SkipIfS3.hbase + @SkipIfLocal.hbase + @SkipIfIsilon.hbase + @SkipIfABFS.hbase + @SkipIfADLS.hbase + def test_query_profile_storage_load_time(self): + """Test that when a query needs load metadata for table(s), the + storage load time should be in the profile. Tests kudu and hbase.""" + # KUDU table + self.__check_query_profile_storage_load_time("functional_kudu") + + # HBASE table + self.__check_query_profile_storage_load_time("functional_hbase") + + def __check_query_profile_storage_load_time(self, db_name): + """Check query profile for storage load time with a given database.""" + self.execute_query("invalidate metadata {0}.alltypes".format(db_name)) + query = "select count (*) from {0}.alltypes".format(db_name) + runtime_profile = self.execute_query(query).runtime_profile + assert "storage-load-time" in runtime_profile + # Call the second time, no metastore loading needed. + runtime_profile = self.execute_query(query).runtime_profile + assert "storage-load-time" not in runtime_profile