IMPALA-7622: adds profile metrics for incremental stats Reapplies change after fixing where frontend profile is placed in runtime profile.
When computing incremental statistics by fetching the stats directly from catalogd, a potentially expensive RPC is made from the impalad coordinator to catalogd. This change adds metrics to the frontend section of the profile to track how long the request takes, the size of the compressed bytes received, and the number of partitions received. The profile for a 'compute incremental ...' command on a table with no statistics looks like this: Frontend: - StatsFetch.CompressedBytes: 0 - StatsFetch.TotalPartitions: 24 - StatsFetch.NumPartitionsWithStats: 0 - StatsFetch.Time: 26ms And the profile looks as follows when the table has stats, so the stats are fetched: Frontend: - StatsFetch.CompressedBytes: 24622 - StatsFetch.TotalPartitions: 23 - StatsFetch.NumPartitionsWithStats: 23 - StatsFetch.Time: 14ms Testing: - manual inspection - e2e test to check the profile Change-Id: I94559a749500d44aa6aad564134d55c39e1d5273 Reviewed-on: http://gerrit.cloudera.org:8080/11670 Reviewed-by: Tianyi Wang <tw...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/97f02829 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/97f02829 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/97f02829 Branch: refs/heads/master Commit: 97f028299c9d9d7493bdbeaacbf0a288678f9371 Parents: a80ec4a Author: Vuk Ercegovac <vercego...@cloudera.com> Authored: Wed Sep 26 16:14:43 2018 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Committed: Fri Oct 12 23:44:42 2018 +0000 ---------------------------------------------------------------------- .../impala/analysis/ComputeStatsStmt.java | 43 ++++++++++++++++- tests/common/custom_cluster_test_suite.py | 2 +- tests/custom_cluster/test_pull_stats.py | 51 ++++++++++++++++++++ 3 files changed, 93 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/97f02829/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java index 36f88f2..24f387c 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java @@ -46,15 +46,18 @@ import org.apache.impala.common.PrintUtils; import org.apache.impala.common.RuntimeEnv; import org.apache.impala.service.BackendConfig; import org.apache.impala.service.CatalogOpExecutor; +import org.apache.impala.service.FrontendProfile; import org.apache.impala.thrift.TComputeStatsParams; import org.apache.impala.thrift.TErrorCode; import org.apache.impala.thrift.TGetPartitionStatsResponse; import org.apache.impala.thrift.TPartitionStats; import org.apache.impala.thrift.TTableName; +import org.apache.impala.thrift.TUnit; import org.apache.log4j.Logger; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -114,6 +117,21 @@ public class ComputeStatsStmt extends StatementBase { private static String AVRO_SCHEMA_MSG_SUFFIX = "Please re-create the table with " + "column definitions, e.g., using the result of 'SHOW CREATE TABLE'"; + // Metrics collected when fetching incremental statistics from Catalogd. All metrics + // are per query. + private static final String STATS_FETCH_PREFIX = "StatsFetch"; + // Time (ms) needed to fetch all partitions stats from catalogd. + private static final String STATS_FETCH_TIME = STATS_FETCH_PREFIX + ".Time"; + // Number of compressed bytes received for all partitions. + private static final String STATS_FETCH_COMPRESSED_BYTES = + STATS_FETCH_PREFIX + ".CompressedBytes"; + // Number of partitions sent from Catalogd. + private static final String STATS_FETCH_TOTAL_PARTITIONS = + STATS_FETCH_PREFIX + ".TotalPartitions"; + // Number of partitions sent from Catalogd that include statistics. + private static final String STATS_FETCH_NUM_PARTITIONS_WITH_STATS = + STATS_FETCH_PREFIX + ".NumPartitionsWithStats"; + protected final TableName tableName_; protected final TableSampleClause sampleParams_; @@ -627,8 +645,6 @@ public class ComputeStatsStmt extends StatementBase { * - incremental statistics are present * - the partition is whitelisted in 'partitions' * - the partition is present in the local impalad catalog - * TODO(vercegovac): Add metrics to track time spent for these rpc's when fetching - * from catalog. Look into adding to timeline. * TODO(vercegovac): Look into parallelizing the fetch while child-queries are * running. Easiest would be to move this fetch to the backend. */ @@ -638,6 +654,10 @@ public class ComputeStatsStmt extends StatementBase { Preconditions.checkState(BackendConfig.INSTANCE.pullIncrementalStatistics() && !RuntimeEnv.INSTANCE.isTestEnv()); if (partitions.isEmpty()) return Collections.emptyMap(); + Stopwatch sw = new Stopwatch().start(); + int numCompressedBytes = 0; + int totalPartitions = 0; + int numPartitionsWithStats = 0; try { TGetPartitionStatsResponse response = analyzer.getCatalog().getPartitionStats(table.getTableName()); @@ -657,16 +677,19 @@ public class ComputeStatsStmt extends StatementBase { // local catalogs are returned. Map<Long, TPartitionStats> partitionStats = Maps.newHashMapWithExpectedSize(partitions.size()); + totalPartitions = partitions.size(); for (FeFsPartition part: partitions) { ByteBuffer compressedStats = response.partition_stats.get( FeCatalogUtils.getPartitionName(part)); if (compressedStats != null) { byte[] compressedStatsBytes = new byte[compressedStats.remaining()]; + numCompressedBytes += compressedStatsBytes.length; compressedStats.get(compressedStatsBytes); TPartitionStats remoteStats = PartitionStatsUtil.partStatsFromCompressedBytes( compressedStatsBytes, part); if (remoteStats != null && remoteStats.isSetIntermediate_col_stats()) { + ++numPartitionsWithStats; partitionStats.put(part.getId(), remoteStats); } } @@ -675,10 +698,26 @@ public class ComputeStatsStmt extends StatementBase { } catch (Exception e) { Throwables.propagateIfInstanceOf(e, AnalysisException.class); throw new AnalysisException("Error fetching partition statistics", e); + } finally { + recordFetchMetrics(numCompressedBytes, totalPartitions, numPartitionsWithStats, sw); } } /** + * Adds metrics to the frontend profile when fetching incremental stats from catalogd. + */ + private static void recordFetchMetrics(int numCompressedBytes, + int totalPartitions, int numPartitionsWithStats, Stopwatch stopwatch) { + FrontendProfile profile = FrontendProfile.getCurrentOrNull(); + if (profile == null) return; + profile.addToCounter(STATS_FETCH_COMPRESSED_BYTES, TUnit.BYTES, numCompressedBytes); + profile.addToCounter(STATS_FETCH_TOTAL_PARTITIONS, TUnit.NONE, totalPartitions); + profile.addToCounter(STATS_FETCH_NUM_PARTITIONS_WITH_STATS, TUnit.NONE, + numPartitionsWithStats); + profile.addToCounter(STATS_FETCH_TIME, TUnit.TIME_MS, stopwatch.elapsedMillis()); + } + + /** * Analyzes the TABLESAMPLE clause and computes the files sample to set * 'effectiveSamplePerc_'. * Returns the TABLESAMPLE SQL to be used for all child queries or an empty string if http://git-wip-us.apache.org/repos/asf/impala/blob/97f02829/tests/common/custom_cluster_test_suite.py ---------------------------------------------------------------------- diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index 4274abf..0140303 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -194,7 +194,7 @@ class CustomClusterTestSuite(ImpalaTestSuite): if pytest.config.option.pull_incremental_statistics: cmd.append("--impalad_args=%s --catalogd_args=%s" % - ("--pull_incremental_statistcs", "--pull_incremental_statistics")) + ("--pull_incremental_statistics", "--pull_incremental_statistics")) default_query_option_kvs = [] # Put any defaults first, then any arguments after that so they can override defaults. http://git-wip-us.apache.org/repos/asf/impala/blob/97f02829/tests/custom_cluster/test_pull_stats.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_pull_stats.py b/tests/custom_cluster/test_pull_stats.py index e470ead..b852f3d 100644 --- a/tests/custom_cluster/test_pull_stats.py +++ b/tests/custom_cluster/test_pull_stats.py @@ -31,3 +31,54 @@ class TestPullStatistics(CustomClusterTestSuite): catalogd_args="--pull_incremental_statistics=true") def test_pull_stats(self, vector, unique_database): self.run_test_case('QueryTest/compute-stats-incremental', vector, unique_database) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(impalad_args="--pull_incremental_statistics=true", + catalogd_args="--pull_incremental_statistics=true") + def test_pull_stats_profile(self, vector, unique_database): + """Checks that the frontend profile includes metrics when computing + incremental statistics. + """ + try: + client = self.cluster.impalads[0].service.create_beeswax_client() + create = "create table test like functional.alltypes" + load = "insert into test partition(year, month) select * from functional.alltypes" + insert = """insert into test partition(year=2009, month=1) values + (29349999, true, 4, 4, 4, 40,4.400000095367432,40.4, + "10/21/09","4","2009-10-21 03:24:09.600000000")""" + stats_all = "compute incremental stats test" + stats_part = "compute incremental stats test partition (year=2009,month=1)" + + # Checks that profile does not have metrics for incremental stats when + # the operation is not 'compute incremental stats'. + self.execute_query_expect_success(client, "use %s" % unique_database) + profile = self.execute_query_expect_success(client, create).runtime_profile + assert profile.count("StatsFetch") == 0 + # Checks that incremental stats metrics are present when 'compute incremental + # stats' is run. Since the table has no stats, expect that no bytes are fetched. + self.execute_query_expect_success(client, load) + profile = self.execute_query_expect_success(client, stats_all).runtime_profile + assert profile.count("StatsFetch") > 1 + assert profile.count("StatsFetch.CompressedBytes: 0") == 1 + # Checks that bytes fetched is non-zero since incremental stats are present now + # and should have been fetched. + self.execute_query_expect_success(client, insert) + profile = self.execute_query_expect_success(client, stats_part).runtime_profile + assert profile.count("StatsFetch") > 1 + assert profile.count("StatsFetch.CompressedBytes") == 1 + assert profile.count("StatsFetch.CompressedBytes: 0") == 0 + # Adds a partition, computes stats, and checks that the metrics in the profile + # reflect the operation. + alter = "alter table test add partition(year=2011, month=1)" + insert_new_partition = """ + insert into test partition(year=2011, month=1) values + (29349999, true, 4, 4, 4, 40,4.400000095367432,40.4, + "10/21/09","4","2009-10-21 03:24:09.600000000") + """ + self.execute_query_expect_success(client, alter) + self.execute_query_expect_success(client, insert_new_partition) + profile = self.execute_query_expect_success(client, stats_all).runtime_profile + assert profile.count("StatsFetch.TotalPartitions: 25") == 1 + assert profile.count("StatsFetch.NumPartitionsWithStats: 24") == 1 + finally: + client.close()