IMPALA-6715,IMPALA-6736: fix stress TPC workload selection IMPALA-6715: This commit IMPALA-6551: Change Kudu TPCDS and TPCH columns to DECIMAL added additional decimal_v2 queries to the stress test that amount to running the same query twice. This makes the binary search run incredibly slow.
- Fix the query selection. Add additional queries that weren't matching before, like the tpcds-q[0-9]+a.test series. - Add a test that will at least ensure if testdata/workloads/tpc*/queries is modified, the stress test will still find the same number of queries for the given workload. There's no obvious place to put this test: it's not testing the product at all, so: - Add a new directory tests/infra for such tests and add it to tests/run-tests.py. - Move the test from IMPALA-6441 into tests/infra. Testing: - Core private build passed. I manually looked to make sure the moved and new tests ran. - Short stress test run. I checked the runtime info and saw the new TPCDS queries in the JSON. - While testing on hardware clusters down stream, I noticed... IMPALA-6736: TPC-DS Q67A is 10x more expensive to run without spilling than any other query. I fixed the --filter-query-mem-ratio option to work. This will still run Q67A during the binary search phase, but if a cluster is too small, the query will be skipped. Change-Id: I3e26b64d38aa8d63a176daf95c4ac5dee89508da Reviewed-on: http://gerrit.cloudera.org:8080/9758 Reviewed-by: David Knupp <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f05cf903 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f05cf903 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f05cf903 Branch: refs/heads/2.x Commit: f05cf9031429a2a14a6dd964e11f067b1f40fd04 Parents: ae20eb4 Author: Michael Brown <[email protected]> Authored: Wed Mar 21 13:08:50 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Mar 27 03:35:00 2018 +0000 ---------------------------------------------------------------------- tests/infra/test_stress_infra.py | 60 ++++++++++++++++++++++++++++++++++ tests/metadata/test_explain.py | 22 ------------- tests/run-tests.py | 2 +- tests/stress/concurrent_select.py | 29 ++++++++++++++-- 4 files changed, 87 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/f05cf903/tests/infra/test_stress_infra.py ---------------------------------------------------------------------- diff --git a/tests/infra/test_stress_infra.py b/tests/infra/test_stress_infra.py new file mode 100644 index 0000000..58f7625 --- /dev/null +++ b/tests/infra/test_stress_infra.py @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This module attempts to enforce infrastructural assumptions that bind test tools to +# product or other constraints. We want to stop these assumptions from breaking at +# pre-commit time, not later. + +import pytest + +from decimal import Decimal + +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.stress.concurrent_select import ( + EXPECTED_TPCDS_QUERIES_COUNT, + EXPECTED_TPCH_NESTED_QUERIES_COUNT, + EXPECTED_TPCH_QUERIES_COUNT, + load_tpc_queries, + match_memory_estimate) + + +class TestStressInfra(ImpalaTestSuite): + + def test_stress_binary_search_start_point(self): + """ + Test that the stress test can use EXPLAIN to find the start point for its binary + search. + """ + result = self.client.execute("explain select 1") + mem_limit, units = match_memory_estimate(result.data) + assert isinstance(units, str) and units.upper() in ('T', 'G', 'M', 'K', ''), ( + 'unexpected units {u} from explain memory estimation\n{output}:'.format( + u=units, output='\n'.join(result.data))) + assert Decimal(mem_limit) >= 0, ( + 'unexpected value from explain\n:' + '\n'.join(result.data)) + + @pytest.mark.parametrize( + 'count_map', + [('tpcds', EXPECTED_TPCDS_QUERIES_COUNT), + ('tpch_nested', EXPECTED_TPCH_NESTED_QUERIES_COUNT), + ('tpch', EXPECTED_TPCH_QUERIES_COUNT)]) + def test_stress_finds_workloads(self, count_map): + """ + Test that the stress test will properly load TPC workloads. + """ + workload, count = count_map + assert count == len(load_tpc_queries(workload)) http://git-wip-us.apache.org/repos/asf/impala/blob/f05cf903/tests/metadata/test_explain.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_explain.py b/tests/metadata/test_explain.py index 22fc177..3ad411a 100644 --- a/tests/metadata/test_explain.py +++ b/tests/metadata/test_explain.py @@ -19,11 +19,8 @@ # import re -from decimal import Decimal - from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.skip import SkipIfLocal, SkipIfNotHdfsMinicluster -from tests.stress.concurrent_select import match_memory_estimate from tests.util.filesystem_utils import WAREHOUSE # Tests the different explain levels [0-3] on a few queries. @@ -178,22 +175,3 @@ class TestExplainEmptyPartition(ImpalaTestSuite): assert "missing relevant table and/or column statistics" in explain_result # Also test IMPALA-1530 - adding the number of partitions missing stats assert "partitions: 1/2 " in explain_result - - -class TestInfraIntegration(ImpalaTestSuite): - """ - This is a test suite to ensure separate test tooling in Python is compatible with the - product. - """ - def test_stress_binary_search_start_point(self): - """ - Test that the stress test can use EXPLAIN to find the start point for its binary - search. - """ - result = self.client.execute("explain select 1") - mem_limit, units = match_memory_estimate(result.data) - assert isinstance(units, str) and units.upper() in ('T', 'G', 'M', 'K', ''), ( - 'unexpected units {u} from explain memory estimation\n{output}:'.format( - u=units, output='\n'.join(result.data))) - assert Decimal(mem_limit) >= 0, ( - 'unexpected value from explain\n:' + '\n'.join(result.data)) http://git-wip-us.apache.org/repos/asf/impala/blob/f05cf903/tests/run-tests.py ---------------------------------------------------------------------- diff --git a/tests/run-tests.py b/tests/run-tests.py index 9902fc9..3a8bd2e 100755 --- a/tests/run-tests.py +++ b/tests/run-tests.py @@ -34,7 +34,7 @@ from _pytest.config import FILE_OR_DIR # We whitelist valid test directories. If a new test directory is added, update this. VALID_TEST_DIRS = ['failure', 'query_test', 'stress', 'unittests', 'aux_query_tests', 'shell', 'hs2', 'catalog_service', 'metadata', 'data_errors', - 'statestore'] + 'statestore', 'infra'] TEST_DIR = os.path.join(os.environ['IMPALA_HOME'], 'tests') RESULT_DIR = os.path.join(os.environ['IMPALA_EE_TEST_LOGS_DIR'], 'results') http://git-wip-us.apache.org/repos/asf/impala/blob/f05cf903/tests/stress/concurrent_select.py ---------------------------------------------------------------------- diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py index 5146d35..fa8541c 100755 --- a/tests/stress/concurrent_select.py +++ b/tests/stress/concurrent_select.py @@ -84,6 +84,14 @@ from tests.util.thrift_util import op_handle_to_query_id LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) +# IMPALA-6715: Every so often the stress test or the TPC workload directories get +# changed, and the stress test loses the ability to run the full set of queries. Set +# these constants and assert that when a workload is used, all the queries we expect to +# use are there. +EXPECTED_TPCDS_QUERIES_COUNT = 71 +EXPECTED_TPCH_NESTED_QUERIES_COUNT = 22 +EXPECTED_TPCH_QUERIES_COUNT = 22 + # Used to short circuit a binary search of the min mem limit. Values will be considered # equal if they are within this ratio or absolute amount of each other. MEM_LIMIT_EQ_THRESHOLD_PC = 0.975 @@ -1065,7 +1073,10 @@ def load_tpc_queries(workload): queries = list() query_dir = os.path.join( os.path.dirname(__file__), "..", "..", "testdata", "workloads", workload, "queries") - file_name_pattern = re.compile(r"-(q\d+).test$") + # IMPALA-6715 and others from the past: This pattern enforces the queries we actually + # find. Both workload directories contain other queries that are not part of the TPC + # spec. + file_name_pattern = re.compile(r"^{0}-(q.*).test$".format(workload)) for query_file in os.listdir(query_dir): match = file_name_pattern.search(query_file) if not match: @@ -1956,6 +1967,7 @@ def main(): # the TPC queries are expected to always complete successfully. if args.tpcds_db: tpcds_queries = load_tpc_queries("tpcds") + assert len(tpcds_queries) == EXPECTED_TPCDS_QUERIES_COUNT for query in tpcds_queries: query.db_name = args.tpcds_db queries.extend(tpcds_queries) @@ -1964,6 +1976,7 @@ def main(): queries.extend(generate_compute_stats_queries(cursor)) if args.tpch_db: tpch_queries = load_tpc_queries("tpch") + assert len(tpch_queries) == EXPECTED_TPCH_QUERIES_COUNT for query in tpch_queries: query.db_name = args.tpch_db queries.extend(tpch_queries) @@ -1972,6 +1985,7 @@ def main(): queries.extend(generate_compute_stats_queries(cursor)) if args.tpch_nested_db: tpch_nested_queries = load_tpc_queries("tpch_nested") + assert len(tpch_nested_queries) == EXPECTED_TPCH_NESTED_QUERIES_COUNT for query in tpch_nested_queries: query.db_name = args.tpch_nested_db queries.extend(tpch_nested_queries) @@ -1980,6 +1994,7 @@ def main(): queries.extend(generate_compute_stats_queries(cursor)) if args.tpch_kudu_db: tpch_kudu_queries = load_tpc_queries("tpch") + assert len(tpch_kudu_queries) == EXPECTED_TPCH_QUERIES_COUNT for query in tpch_kudu_queries: query.db_name = args.tpch_kudu_db queries.extend(tpch_kudu_queries) @@ -1992,6 +2007,7 @@ def main(): queries.extend(generate_DML_queries(cursor, args.dml_mod_values)) if args.tpcds_kudu_db: tpcds_kudu_queries = load_tpc_queries("tpcds") + assert len(tpcds_kudu_queries) == EXPECTED_TPCDS_QUERIES_COUNT for query in tpcds_kudu_queries: query.db_name = args.tpcds_kudu_db queries.extend(tpcds_kudu_queries) @@ -2049,10 +2065,17 @@ def main(): # Remove any queries that would use "too many" resources. This way a larger number # of queries will run concurrently. + if query.required_mem_mb_without_spilling is not None and \ + query.required_mem_mb_without_spilling / float(impala.min_impalad_mem_mb) \ + > args.filter_query_mem_ratio: + LOG.debug( + "Filtering non-spilling query that exceeds " + "--filter-query-mem-ratio: " + query.sql) + query.required_mem_mb_without_spilling = None if query.required_mem_mb_with_spilling is None \ - or query.required_mem_mb_with_spilling / impala.min_impalad_mem_mb \ + or query.required_mem_mb_with_spilling / float(impala.min_impalad_mem_mb) \ > args.filter_query_mem_ratio: - LOG.debug("Filtered query due to mem ratio option: " + query.sql) + LOG.debug("Filtering query that exceeds --filter-query-mem-ratio: " + query.sql) del queries[idx] # Remove queries that have a nested loop join in the plan.
