This is an automated email from the ASF dual-hosted git repository. tmarshall pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 16493c0416d5de9c02e70c9b786e980a739020da Author: Qifan Chen <[email protected]> AuthorDate: Tue Feb 23 14:05:45 2021 -0500 IMPALA-10532 TestOverlapMinMaxFilters.test_overlap_min_max_filters seems flaky This patch addresses the flakiness seen with a particular test within overlap_min_max_filters by allowing the sum of NumRuntimeFilteredPages to be greater than an expected value. Previously, such a sum can only be equal to the expected value and is not sufficient for various test conditions in which the scan of the parquet data files can start before the arrival of a runtime filter. The extension in test_result_verifier.py allows '>' and '<' condition to be expressed for aggregation(SUM, <counter>), such as aggregation(SUM, NumRuntimeFilteredPages)> 80. Testing: - Ran TestOverlapMinMaxFilters. Change-Id: I93940a104bfb2d68cb1d41d7e303348190fd5972 Reviewed-on: http://gerrit.cloudera.org:8080/17111 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../queries/QueryTest/overlap_min_max_filters.test | 29 +++++++++++++++-- tests/common/test_result_verifier.py | 36 ++++++++++++++++------ 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test index 90cb805..49641ea 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test +++ b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test @@ -1,12 +1,26 @@ ==== ---- QUERY +################################################## +# Create a new lineitem table with sorted l_orderkey +# with controlled number of rows per page. +################################################### +set PARQUET_PAGE_ROW_COUNT_LIMIT=24000; +drop table if exists tpch_parquet.lineitem_orderkey_only; +CREATE TABLE tpch_parquet.lineitem_orderkey_only(l_orderkey bigint) +sort by (l_orderkey) +STORED AS PARQUET; +insert into tpch_parquet.lineitem_orderkey_only +select l_orderkey from tpch_parquet.lineitem; +==== +---- QUERY ################################################### # Both a.l_orderkey and b.o_orderkey are BIGINT. ################################################### SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; SET MINMAX_FILTERING_LEVEL=PAGE; +SET MINMAX_FILTER_THRESHOLD=0.5; select straight_join a.l_orderkey from -tpch_parquet.lineitem a join [SHUFFLE] tpch_parquet.orders b +tpch_parquet.lineitem_orderkey_only a join [SHUFFLE] tpch_parquet.orders b where a.l_orderkey = b.o_orderkey and b.o_custkey = 5 order by l_orderkey; ---- RESULTS @@ -30,13 +44,20 @@ and b.o_custkey = 5 order by l_orderkey; 2630562 2630562 ---- RUNTIME_PROFILE -row_regex: .*NumRuntimeFilteredPages: 9.* +aggregation(SUM, NumRuntimeFilteredPages)> 200 +==== +---- QUERY +################################################### +# Drop the table. +################################################### +drop table if exists tpch_parquet.lineitem_orderkey_only; ==== ---- QUERY ################################################### # ss_sold_time_sk is INT. ################################################### SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; +SET MINMAX_FILTER_THRESHOLD=0.5; select straight_join count(*) from tpcds_parquet.store_sales a join [SHUFFLE] tpcds_parquet.store_sales b where a.ss_sold_time_sk = b.ss_sold_time_sk and b.ss_customer_sk = 1 @@ -51,6 +72,7 @@ aggregation(SUM, NumRuntimeFilteredRowGroups): 16 # BIGINT. Implicit casting on a is involved. ################################################### SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; +SET MINMAX_FILTER_THRESHOLD=0.5; select straight_join count(*) from tpcds_parquet.store_sales a join [SHUFFLE] tpcds_parquet.store_sales b where a.ss_sold_time_sk = b.ss_item_sk and b.ss_addr_sk < 20; @@ -66,6 +88,7 @@ aggregation(SUM, NumRuntimeFilteredRowGroups): 1824 # return the same result as the query above. ################################################### SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; +SET MINMAX_FILTER_THRESHOLD=0.5; select straight_join count(*) from tpcds_parquet.store_sales a join [SHUFFLE] tpcds_parquet.store_sales b where a.ss_sold_time_sk = b.ss_item_sk and a.ss_addr_sk < 20; @@ -89,7 +112,7 @@ where a.ss_sold_time_sk = b.ss_item_sk and a.ss_addr_sk < 20; 0 ---- RUNTIME_PROFILE aggregation(SUM, NumRuntimeFilteredRowGroups): 0 -aggregation(SUM, NumRuntimeFilteredRowPages): 0 +aggregation(SUM, NumRuntimeFilteredPages): 0 ==== ---- QUERY ################################################## diff --git a/tests/common/test_result_verifier.py b/tests/common/test_result_verifier.py index 124fa86..02e3b88 100644 --- a/tests/common/test_result_verifier.py +++ b/tests/common/test_result_verifier.py @@ -528,7 +528,7 @@ def parse_result_rows(exec_result, escape_strings=True): # Currently, the only implemented function is SUM and only integers are supported. AGGREGATION_PREFIX_PATTERN = 'aggregation\(' AGGREGATION_PREFIX = re.compile(AGGREGATION_PREFIX_PATTERN) -AGGREGATION_SYNTAX_MATCH_PATTERN = 'aggregation\((\w+)[ ]*,[ ]*([^)]+)\):[ ]*(\d+)' +AGGREGATION_SYNTAX_MATCH_PATTERN = 'aggregation\((\w+)[ ]*,[ ]*([^)]+)\)([:><])[ ]*(\d+)' def try_compile_aggregation(row_string): """ @@ -537,12 +537,13 @@ def try_compile_aggregation(row_string): aggregation. Otherwise, it returns None. """ if row_string and AGGREGATION_PREFIX.match(row_string): - function, field, value = re.findall(AGGREGATION_SYNTAX_MATCH_PATTERN, row_string)[0] + function, field, op, value = \ + re.findall(AGGREGATION_SYNTAX_MATCH_PATTERN, row_string)[0] # Validate function assert(function == 'SUM') # Validate value is integer expected_value = int(value) - return (function, field, expected_value) + return (function, field, op, expected_value) return None def compute_aggregation(function, field, runtime_profile): @@ -638,16 +639,31 @@ def verify_runtime_profile(expected, actual, update_section=False): # Compute the aggregations and check against values for i in xrange(len(expected_aggregations)): if (expected_aggregations[i] is None): continue - function, field, expected_value = expected_aggregations[i] + function, field, op, expected_value = expected_aggregations[i] actual_value = compute_aggregation(function, field, actual) if update_section: - updated_aggregations.append("aggregation(%s, %s): %d" - % (function, field, actual_value)) + updated_aggregations.append("aggregation(%s, %s)%s %d" + % (function, field, op, actual_value)) else: - assert actual_value == expected_value, ("Aggregation of %s over %s did not match " - "expected results.\nEXPECTED VALUE:\n%d\n\nACTUAL VALUE:\n%d" - "\n\nPROFILE:\n%s\n" - % (function, field, expected_value, actual_value, actual)) + if op == ':' and actual_value != expected_value: + assert actual_value == expected_value, ("Aggregation of %s over %s did not " + "match expected results.\nEXPECTED VALUE:\n%d\n\n\nACTUAL VALUE:\n%d\n\n" + "OP:\n%s\n\n" + "\n\nPROFILE:\n%s\n" + % (function, field, expected_value, actual_value, op, actual)) + elif op == '>' and actual_value <= expected_value: + assert actual_value > expected_value, ("Aggregation of %s over %s did not " + "match expected results.\nEXPECTED VALUE:\n%d\n\n\nACTUAL VALUE:\n%d\n\n" + "OP:\n%s\n\n" + "\n\nPROFILE:\n%s\n" + % (function, field, expected_value, actual_value, op, actual)) + elif op == '<' and actual_value >= expected_value: + assert actual_value < expected_value, ("Aggregation of %s over %s did not " + "match expected results.\nEXPECTED VALUE:\n%d\n\n\nACTUAL VALUE:\n%d\n\n" + "OP:\n%s\n\n" + "\n\nPROFILE:\n%s\n" + % (function, field, expected_value, actual_value, op, actual)) + return updated_aggregations
