This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2fd795cf56e65a43087375867dcc9890e3a27330
Author: Yongzhi Chen <yc...@cloudera.com>
AuthorDate: Fri Apr 5 09:12:29 2019 -0400

    IMPALA-7322: Add storage wait time to profile
    
    Add metrics to record storage wait time for operations with
    metadata load in catalog for hdfs, kudu and hbase tables.
    Pass storage wait time from catalog to fe through thrift and log
    total storage load time in query profile.
    Storage-load-time is the amount of time spent loading
    metadata from the underlying storage layer (e.g. S3, HDFS,
    Kudu, HBase), which does not  include the amount of time
    spending loading data from HMS.
    
    Testing:
    Ran queries that can trigger all of, none of or some of the related
    tables loading. Check query profile for each query. Check catalog
    metrics for each table.
    Add unit tests to test_observability.py
    Ran all core tests.
    
    Sample output:
    Profile:(storage-load-time is the added property):
    After ran a hbase query (Metadata load finished is divided into
    several lines because of limitation of commit message):
    Query Compilation: 4s401ms
      - Metadata load started: 661.084us (661.084us)
      - Metadata load finished. loaded-tables=1/1
          load-requests=1 catalog-updates=3
          storage-load-time=233ms: 3s819ms (3s819ms)
     - Analysis finished: 3s820ms (763.979us)
     - Value transfer graph computed: 3s820ms (63.193us)
    Catalog metrics(this sample is from a hdfs table):
    storage-metadata-load-duration:
       Count: 1
       Mean rate: 0.0085
       1 min. rate: 0.032
       5 min. rate: 0.1386
       15 min. rate: 0.177
       Min (msec): 111
       Max (msec): 111
       Mean (msec): 111.1802
       Median (msec): 111.1802
       75th-% (msec): 111.1802
       95th-% (msec): 111.1802
       99th-% (msec): 111.1802
    Change-Id: I6dde7e394b7c1c396d835ef6aa0a55930c0a8660
    Reviewed-on: http://gerrit.cloudera.org:8080/12940
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 common/thrift/CatalogObjects.thrift                |  4 ++
 .../apache/impala/analysis/StmtMetadataLoader.java | 22 ++++++-
 .../java/org/apache/impala/catalog/HBaseTable.java | 17 ++++--
 .../java/org/apache/impala/catalog/HdfsTable.java  | 67 ++++++++++++++++------
 .../java/org/apache/impala/catalog/KuduTable.java  |  9 ++-
 .../main/java/org/apache/impala/catalog/Table.java | 12 ++++
 tests/query_test/test_observability.py             | 29 ++++++++++
 7 files changed, 131 insertions(+), 29 deletions(-)

diff --git a/common/thrift/CatalogObjects.thrift 
b/common/thrift/CatalogObjects.thrift
index 01bba1d..1152ecf 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -490,6 +490,10 @@ struct TTable {
   // For example ValidReaderWriteIdList object's format is:
   // 
<table_name>:<highwatermark>:<minOpenWriteId>:<open_writeids>:<abort_writeids>
   14: optional string valid_write_ids
+
+  // Set if this table needs storage access during metadata load.
+  // Time used for storage loading in nanoseconds.
+  15: optional i64 storage_metadata_load_time
 }
 
 // Represents a database.
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
index fd11af1..bfe041b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
@@ -18,6 +18,7 @@
 package org.apache.impala.analysis;
 
 import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -28,6 +29,7 @@ import org.apache.impala.catalog.FeCatalog;
 import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
+import org.apache.impala.catalog.Table;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.Frontend;
@@ -224,11 +226,25 @@ public class StmtMetadataLoader {
       missingTbls = newMissingTbls;
       ++numCatalogUpdatesReceived_;
     }
+
     if (timeline_ != null) {
-      timeline_.markEvent(String.format("Metadata load finished. " +
-          "loaded-tables=%d/%d load-requests=%d catalog-updates=%d",
+      long storageLoadTimeNano = 0;
+      // Calculate the total storage loading time for this query (not including
+      // the tables already loaded before the query was called).
+      storageLoadTimeNano =
+          loadedTbls_.values()
+              .stream()
+              .filter(Table.class::isInstance)
+              .map(Table.class::cast)
+              .filter(loadedTbl -> 
requestedTbls.contains(loadedTbl.getTableName()))
+              .mapToLong(Table::getStorageLoadTime)
+              .sum();
+      timeline_.markEvent(String.format("Metadata load finished. "
+              + "loaded-tables=%d/%d load-requests=%d catalog-updates=%d "
+              + "storage-load-time=%dms",
           requestedTbls.size(), loadedTbls_.size(), numLoadRequestsSent_,
-          numCatalogUpdatesReceived_));
+          numCatalogUpdatesReceived_,
+          TimeUnit.MILLISECONDS.convert(storageLoadTimeNano, 
TimeUnit.NANOSECONDS)));
 
       if (MetastoreShim.getMajorVersion() > 2) {
         StringBuilder validIdsBuf = new StringBuilder("Loaded 
ValidWriteIdLists: ");
diff --git a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
index d75afa7..6810be9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
@@ -103,11 +103,18 @@ public class HBaseTable extends Table implements 
FeHBaseTable {
     Preconditions.checkNotNull(getMetaStoreTable());
     try (Timer.Context timer = 
getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time()) {
       msTable_ = msTbl;
-      hbaseTableName_ = Util.getHBaseTableName(getMetaStoreTable());
-      // Warm up the connection and verify the table exists.
-      Util.getHBaseTable(hbaseTableName_).close();
-      columnFamilies_ = null;
-      List<Column> cols = Util.loadColumns(msTable_);
+      final Timer.Context storageLoadTimer =
+          
getMetrics().getTimer(Table.STORAGE_METADATA_LOAD_DURATION_METRIC).time();
+      List<Column> cols;
+      try {
+        hbaseTableName_ = Util.getHBaseTableName(getMetaStoreTable());
+        // Warm up the connection and verify the table exists.
+        Util.getHBaseTable(hbaseTableName_).close();
+        columnFamilies_ = null;
+        cols = Util.loadColumns(msTable_);
+      } finally {
+        storageMetadataLoadTime_ = storageLoadTimer.stop();
+      }
       clearColumns();
       for (Column col : cols) addColumn(col);
       // Set table stats.
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 779a96e..08ab6e5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
@@ -87,6 +88,7 @@ import org.apache.impala.util.ThreadNameAnnotator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.codahale.metrics.Clock;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
@@ -504,16 +506,18 @@ public class HdfsTable extends Table implements FeFsTable 
{
    * Create HdfsPartition objects corresponding to 'msPartitions' and add them 
to this
    * table's partition list. Any partition metadata will be reset and loaded 
from
    * scratch. For each partition created, we load the block metadata for each 
data file
-   * under it.
+   * under it. Returns time spent loading the filesystem metadata in 
nanoseconds.
    *
    * If there are no partitions in the Hive metadata, a single partition is 
added with no
    * partition keys.
    */
-  private void loadAllPartitions(
+  private long loadAllPartitions(
       List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws IOException,
       CatalogException {
     Preconditions.checkNotNull(msTbl);
+    final Clock clock = Clock.defaultClock();
+    long startTime = clock.getTick();
     initializePartitionMetadata(msTbl);
     FsPermissionCache permCache = preloadPermissionsCache(msPartitions);
 
@@ -541,6 +545,7 @@ public class HdfsTable extends Table implements FeFsTable {
     }
     // Load the file metadata from scratch.
     loadFileMetadataForPartitions(partitionMap_.values(), /*isRefresh=*/false);
+    return clock.getTick() - startTime;
   }
 
 
@@ -926,7 +931,10 @@ public class HdfsTable extends Table implements FeFsTable {
         loadTableSchema ? "table definition and " : "",
         partitionsToUpdate == null ? "all" : 
String.valueOf(partitionsToUpdate.size()),
         msTbl.getDbName(), msTbl.getTableName(), reason);
-    LOG.info(annotation);;
+    LOG.info(annotation);
+    final Timer storageLdTimer =
+        getMetrics().getTimer(Table.STORAGE_METADATA_LOAD_DURATION_METRIC);
+    storageMetadataLoadTime_ = 0;
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
       // turn all exceptions into TableLoadingException
       msTable_ = msTbl;
@@ -940,16 +948,19 @@ public class HdfsTable extends Table implements FeFsTable 
{
             //TODO writeIDs may also be loaded in other code paths.
             loadValidWriteIdList(client);
         }
+
         // Load partition and file metadata
         if (reuseMetadata) {
           // Incrementally update this table's partitions and file metadata
           Preconditions.checkState(
               partitionsToUpdate == null || loadParitionFileMetadata);
-          updateMdFromHmsTable(msTbl);
+          storageMetadataLoadTime_ += updateMdFromHmsTable(msTbl);
           if (msTbl.getPartitionKeysSize() == 0) {
-            if (loadParitionFileMetadata) updateUnpartitionedTableFileMd();
+            if (loadParitionFileMetadata) {
+              storageMetadataLoadTime_ += updateUnpartitionedTableFileMd();
+            }
           } else {
-            updatePartitionsFromHms(
+            storageMetadataLoadTime_ += updatePartitionsFromHms(
                 client, partitionsToUpdate, loadParitionFileMetadata);
           }
           LOG.info("Incrementally loaded table metadata for: " + 
getFullName());
@@ -960,7 +971,7 @@ public class HdfsTable extends Table implements FeFsTable {
               MetaStoreUtil.fetchAllPartitions(
                   client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);
           LOG.info("Fetched partition metadata from the Metastore: " + 
getFullName());
-          loadAllPartitions(msPartitions, msTbl);
+          storageMetadataLoadTime_ = loadAllPartitions(msPartitions, msTbl);
         }
         if (loadTableSchema) setAvroSchema(client, msTbl);
         setTableStats(msTbl);
@@ -973,37 +984,47 @@ public class HdfsTable extends Table implements FeFsTable 
{
             + getFullName(), e);
       }
     } finally {
+      storageLdTimer.update(storageMetadataLoadTime_, TimeUnit.NANOSECONDS);
       context.stop();
     }
   }
 
   /**
    * Updates the table metadata, including 'hdfsBaseDir_', 'isMarkedCached_',
-   * and 'accessLevel_' from 'msTbl'. Throws an IOException if there was an 
error
-   * accessing the table location path.
+   * and 'accessLevel_' from 'msTbl'. Returns time spent accessing file system
+   * in nanoseconds. Throws an IOException if there was an error accessing
+   * the table location path.
    */
-  private void updateMdFromHmsTable(org.apache.hadoop.hive.metastore.api.Table 
msTbl)
+  private long  
updateMdFromHmsTable(org.apache.hadoop.hive.metastore.api.Table msTbl)
       throws IOException {
     Preconditions.checkNotNull(msTbl);
+    final Clock clock = Clock.defaultClock();
+    long filesystemAccessTime = 0;
+    long startTime = clock.getTick();
     hdfsBaseDir_ = msTbl.getSd().getLocation();
     isMarkedCached_ = 
HdfsCachingUtil.validateCacheParams(msTbl.getParameters());
     Path location = new Path(hdfsBaseDir_);
     accessLevel_ = getAvailableAccessLevel(getFullName(), location,
         new FsPermissionCache());
+    filesystemAccessTime = clock.getTick() - startTime;
     setMetaStoreTable(msTbl);
+    return filesystemAccessTime;
   }
 
   /**
    * Incrementally updates the file metadata of an unpartitioned HdfsTable.
+   * Returns time spent updating the file metadata in nanoseconds.
    *
    * This is optimized for the case where few files have changed. See
    * {@link #refreshFileMetadata(Path, List)} above for details.
    */
-  private void updateUnpartitionedTableFileMd() throws CatalogException {
+  private long updateUnpartitionedTableFileMd() throws CatalogException {
     Preconditions.checkState(getNumClusteringCols() == 0);
     if (LOG.isTraceEnabled()) {
       LOG.trace("update unpartitioned table: " + getFullName());
     }
+    final Clock clock = Clock.defaultClock();
+    long startTime = clock.getTick();
     HdfsPartition oldPartition = 
Iterables.getOnlyElement(partitionMap_.values());
 
     // Instead of updating the existing partition in place, we create a new one
@@ -1021,18 +1042,21 @@ public class HdfsTable extends Table implements 
FeFsTable {
     addPartition(part);
     if (isMarkedCached_) part.markCached();
     loadFileMetadataForPartitions(ImmutableList.of(part), /*isRefresh=*/true);
+    return clock.getTick() - startTime;
   }
 
   /**
-   * Updates the partitions of an HdfsTable so that they are in sync with the 
Hive
-   * Metastore. It reloads partitions that were marked 'dirty' by doing a DROP 
+ CREATE.
-   * It removes from this table partitions that no longer exist in the Hive 
Metastore and
-   * adds partitions that were added externally (e.g. using Hive) to the Hive 
Metastore
-   * but do not exist in this table. If 'loadParitionFileMetadata' is true, it 
triggers
-   * file/block metadata reload for the partitions specified in 
'partitionsToUpdate', if
-   * any, or for all the table partitions if 'partitionsToUpdate' is null.
+   * Updates the partitions of an HdfsTable so that they are in sync with the
+   * Hive Metastore. It reloads partitions that were marked 'dirty' by doing a
+   * DROP + CREATE. It removes from this table partitions that no longer exist
+   * in the Hive Metastore and adds partitions that were added externally (e.g.
+   * using Hive) to the Hive Metastore but do not exist in this table. If
+   * 'loadParitionFileMetadata' is true, it triggers file/block metadata reload
+   * for the partitions specified in 'partitionsToUpdate', if any, or for all
+   * the table partitions if 'partitionsToUpdate' is null. Returns time
+   * spent loading file metadata in nanoseconds.
    */
-  private void updatePartitionsFromHms(IMetaStoreClient client,
+  private long updatePartitionsFromHms(IMetaStoreClient client,
       Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata)
       throws Exception {
     if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + 
getFullName());
@@ -1096,14 +1120,19 @@ public class HdfsTable extends Table implements 
FeFsTable {
     // Load file metadata. Until we have a notification mechanism for when a
     // file changes in hdfs, it is sometimes required to reload all the file
     // descriptors and block metadata of a table (e.g. REFRESH statement).
+    long fileLoadMdTime = 0;
     if (loadPartitionFileMetadata) {
+      final Clock clock = Clock.defaultClock();
+      long startTime = clock.getTick();
       if (partitionsToUpdate != null) {
         Preconditions.checkState(partitionsToLoadFiles.isEmpty());
         // Only reload file metadata of partitions specified in 
'partitionsToUpdate'
         partitionsToLoadFiles = getPartitionsForNames(partitionsToUpdate);
       }
       loadFileMetadataForPartitions(partitionsToLoadFiles, /* 
isRefresh=*/true);
+      fileLoadMdTime = clock.getTick() - startTime;
     }
+    return fileLoadMdTime;
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java 
b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index 338c979..21feda7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -310,14 +310,19 @@ public class KuduTable extends Table implements 
FeKuduTable {
             " property found for Kudu table " + kuduTableName_);
       }
       setTableStats(msTable_);
-      // Load metadata from Kudu and HMS
+      // Load metadata from Kudu
+      final Timer.Context ctxStorageLdTime =
+          
getMetrics().getTimer(Table.STORAGE_METADATA_LOAD_DURATION_METRIC).time();
       try {
         loadSchemaFromKudu();
-        loadAllColumnStats(msClient);
       } catch (ImpalaRuntimeException e) {
         throw new TableLoadingException("Error loading metadata for Kudu table 
" +
             kuduTableName_, e);
+      } finally {
+        storageMetadataLoadTime_ = ctxStorageLdTime.stop();
       }
+      // Load from HMS
+      loadAllColumnStats(msClient);
       refreshLastUsedTime();
       // Avoid updating HMS if the schema didn't change.
       if (msTable_.equals(msTbl)) return;
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java 
b/fe/src/main/java/org/apache/impala/catalog/Table.java
index b0264c2..acab108 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -115,6 +115,9 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
   // True if this object is stored in an Impalad catalog cache.
   protected boolean storedInImpaladCatalogCache_ = false;
 
+  // Time spent in the source systems loading/reloading the fs metadata for 
the table.
+  protected long storageMetadataLoadTime_ = 0;
+
   // Last used time of this table in nanoseconds as returned by
   // CatalogdTableInvalidator.nanoTime(). This is only set in catalogd and not 
used by
   // impalad.
@@ -138,6 +141,8 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
   public static final String REFRESH_DURATION_METRIC = "refresh-duration";
   public static final String ALTER_DURATION_METRIC = "alter-duration";
   public static final String LOAD_DURATION_METRIC = "load-duration";
+  public static final String STORAGE_METADATA_LOAD_DURATION_METRIC =
+      "storage-metadata-load-duration";
 
   // Table property key for storing the time of the last DDL operation.
   public static final String TBL_PROP_LAST_DDL_TIME = "transient_lastDdlTime";
@@ -197,10 +202,14 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
     metrics_.addTimer(REFRESH_DURATION_METRIC);
     metrics_.addTimer(ALTER_DURATION_METRIC);
     metrics_.addTimer(LOAD_DURATION_METRIC);
+    metrics_.addTimer(STORAGE_METADATA_LOAD_DURATION_METRIC);
   }
 
   public Metrics getMetrics() { return metrics_; }
 
+  // Returns storage wait time during metadata load.
+  public long getStorageLoadTime() { return storageMetadataLoadTime_; }
+
   // Returns true if this table reference comes from the impalad catalog cache 
or if it
   // is loaded from the testing framework. Returns false if this table 
reference points
   // to a table stored in the catalog server.
@@ -399,6 +408,8 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
     accessLevel_ = thriftTable.isSetAccess_level() ? 
thriftTable.getAccess_level() :
         TAccessLevel.READ_WRITE;
 
+    storageMetadataLoadTime_ = thriftTable.getStorage_metadata_load_time();
+
     storedInImpaladCatalogCache_ = true;
     validWriteIds_ = thriftTable.isSetValid_write_ids() ?
         thriftTable.getValid_write_ids() : null;
@@ -435,6 +446,7 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
 
     TTable table = new TTable(db_.getName(), name_);
     table.setAccess_level(accessLevel_);
+    table.setStorage_metadata_load_time(storageMetadataLoadTime_);
 
     // Populate both regular columns and clustering columns (if there are any).
     table.setColumns(new ArrayList<>());
diff --git a/tests/query_test/test_observability.py 
b/tests/query_test/test_observability.py
index fef2fef..62e1aeb 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -585,3 +585,32 @@ class TestObservability(ImpalaTestSuite):
     query = "select count (*) from functional.alltypes"
     runtime_profile = self.execute_query(query).runtime_profile
     self.__verify_profile_event_sequence(event_regexes, runtime_profile)
+
+  def test_query_profile_storage_load_time_filesystem(self):
+    """Test that when a query needs load metadata for table(s), the
+    storage load time should be in the profile. Tests file systems."""
+    self.__check_query_profile_storage_load_time("functional")
+
+  @SkipIfS3.hbase
+  @SkipIfLocal.hbase
+  @SkipIfIsilon.hbase
+  @SkipIfABFS.hbase
+  @SkipIfADLS.hbase
+  def test_query_profile_storage_load_time(self):
+    """Test that when a query needs load metadata for table(s), the
+    storage load time should be in the profile. Tests kudu and hbase."""
+    # KUDU table
+    self.__check_query_profile_storage_load_time("functional_kudu")
+
+    # HBASE table
+    self.__check_query_profile_storage_load_time("functional_hbase")
+
+  def __check_query_profile_storage_load_time(self, db_name):
+    """Check query profile for storage load time with a given database."""
+    self.execute_query("invalidate metadata {0}.alltypes".format(db_name))
+    query = "select count (*) from {0}.alltypes".format(db_name)
+    runtime_profile = self.execute_query(query).runtime_profile
+    assert "storage-load-time" in runtime_profile
+    # Call the second time, no metastore loading needed.
+    runtime_profile = self.execute_query(query).runtime_profile
+    assert "storage-load-time" not in runtime_profile

Reply via email to