This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push: new 0c8fc997e IMPALA-12395: Override scan cardinality for optimized count star 0c8fc997e is described below commit 0c8fc997ef7df09b675180a7baa1482852d60b11 Author: Riza Suminto <riza.sumi...@cloudera.com> AuthorDate: Tue Aug 22 17:51:28 2023 -0700 IMPALA-12395: Override scan cardinality for optimized count star The cardinality estimate in HdfsScanNode.java for count queries does not account for the fact that the count optimization only scans metadata and not the actual columns. Optimized count star scan will return only 1 row per parquet row group. This patch override the scan cardinality with total number of files, which is the closest estimate to number of row group. Similar override already exist in IcebergScanNode.java. Testing: - Add count query testcases in test_query_cpu_count_divisor_default - Pass core tests Change-Id: Id5ce967657208057d50bd80adadac29ebb51cbc5 Reviewed-on: http://gerrit.cloudera.org:8080/20406 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- .../org/apache/impala/planner/HdfsScanNode.java | 7 ++++ .../queries/PlannerTest/resource-requirements.test | 9 +++-- tests/custom_cluster/test_executor_groups.py | 47 +++++++++++++++++++--- tests/custom_cluster/test_query_retries.py | 33 ++++++++------- 4 files changed, 73 insertions(+), 23 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 2e1a87d88..41a230741 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -1558,6 +1558,13 @@ public class HdfsScanNode extends ScanNode { numRangesAdjusted : Math.min(inputCardinality_, numRangesAdjusted); } + + if (countStarSlot_ != null) { + // We are doing optimized count star. Override cardinality with total num files. + long totalFiles = sumValues(totalFilesPerFs_); + inputCardinality_ = totalFiles; + cardinality_ = totalFiles; + } if (LOG.isTraceEnabled()) { LOG.trace("HdfsScan: cardinality_=" + Long.toString(cardinality_)); } diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test index ee00ac9c4..abf0e1417 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test @@ -1869,6 +1869,7 @@ select count(*) from tpch_parquet.lineitem ---- PLAN Max Per-Host Resource Reservation: Memory=128.00KB Threads=2 Per-Host Resource Estimates: Memory=10MB +Codegen disabled by planner Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 @@ -1890,11 +1891,12 @@ PLAN-ROOT SINK columns: all extrapolated-rows=disabled max-scan-range-rows=2.14M mem-estimate=1.00MB mem-reservation=128.00KB thread-reservation=1 - tuple-ids=0 row-size=8B cardinality=6.00M + tuple-ids=0 row-size=8B cardinality=3 in pipelines: 00(GETNEXT) ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=128.00KB Threads=3 Per-Host Resource Estimates: Memory=10MB +Codegen disabled by planner Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 @@ -1929,11 +1931,12 @@ Per-Host Resources: mem-estimate=1.02MB mem-reservation=128.00KB thread-reservat columns: all extrapolated-rows=disabled max-scan-range-rows=2.14M mem-estimate=1.00MB mem-reservation=128.00KB thread-reservation=1 - tuple-ids=0 row-size=8B cardinality=6.00M + tuple-ids=0 row-size=8B cardinality=3 in pipelines: 00(GETNEXT) ---- PARALLELPLANS Max Per-Host Resource Reservation: Memory=128.00KB Threads=2 Per-Host Resource Estimates: Memory=80MB +Codegen disabled by planner Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 @@ -1968,7 +1971,7 @@ Per-Instance Resources: mem-estimate=80.02MB mem-reservation=128.00KB thread-res columns: all extrapolated-rows=disabled max-scan-range-rows=2.14M mem-estimate=80.00MB mem-reservation=128.00KB thread-reservation=0 - tuple-ids=0 row-size=8B cardinality=6.00M + tuple-ids=0 row-size=8B cardinality=3 in pipelines: 00(GETNEXT) ==== # Sort diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index bd6ffca81..7d250bda9 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -764,9 +764,9 @@ class TestExecutorGroups(CustomClusterTestSuite): def test_query_assignment_with_two_exec_groups(self): """This test verifies that query assignment works with two executor groups with different number of executors and memory limit in each.""" - # A small query with estimated memory per host of 10MB that can run on the small + # A small query with estimated memory per host of 16MB that can run on the small # executor group - SMALL_QUERY = "select count(*) from tpcds_parquet.date_dim;" + SMALL_QUERY = "select count(*) from tpcds_parquet.date_dim where d_year=2022;" # A large query with estimated memory per host of 132MB that can only run on # the large executor group. LARGE_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 1 limit 50;" @@ -1093,6 +1093,41 @@ class TestExecutorGroups(CustomClusterTestSuite): 'PROCESSING_COST_MIN_THREADS': '', 'MAX_FRAGMENT_INSTANCES_PER_NODE': ''}) + # BEGIN testing count queries + # Test optimized count star query with 1824 scan ranges assign to small group. + self._run_query_and_verify_profile( + "SELECT count(*) FROM tpcds_parquet.store_sales", + ["Executor Group: root.small-group", "EffectiveParallelism: 10", + "ExecutorGroupsConsidered: 2"]) + + # Test optimized count star query with 383 scan ranges assign to tiny group. + self._run_query_and_verify_profile( + "SELECT count(*) FROM tpcds_parquet.store_sales WHERE ss_sold_date_sk < 2451200", + ["Executor Group: root.tiny-group", "EffectiveParallelism: 2", + "ExecutorGroupsConsidered: 1"]) + + # Test optimized count star query with 1 scan range detected as trivial query + # and assign to tiny group. + self._run_query_and_verify_profile( + "SELECT count(*) FROM tpcds_parquet.date_dim", + ["Executor Group: empty group (using coordinator only)", + "ExecutorGroupsConsidered: 1", + "Verdict: Assign to first group because the number of nodes is 1"]) + + # Test unoptimized count star query assign to small group. + self._run_query_and_verify_profile( + ("SELECT count(*) FROM tpcds_parquet.store_sales " + "WHERE ss_ext_discount_amt != 0.3857"), + ["Executor Group: root.small-group", "EffectiveParallelism: 10", + "ExecutorGroupsConsidered: 2"]) + + # Test zero slot scan query assign to small group. + self._run_query_and_verify_profile( + "SELECT count(ss_sold_date_sk) FROM tpcds_parquet.store_sales", + ["Executor Group: root.small-group", "EffectiveParallelism: 10", + "ExecutorGroupsConsidered: 2"]) + # END testing count queries + # BEGIN testing insert + MAX_FS_WRITER # Test unpartitioned insert, small scan, no MAX_FS_WRITER. # Scanner and writer will collocate since num scanner equals to num writer (1). @@ -1186,11 +1221,11 @@ class TestExecutorGroups(CustomClusterTestSuite): # END testing insert + MAX_FS_WRITER # Check resource pools on the Web queries site and admission site - self._verify_query_num_for_resource_pool("root.small", 7) - self._verify_query_num_for_resource_pool("root.tiny", 4) + self._verify_query_num_for_resource_pool("root.small", 10) + self._verify_query_num_for_resource_pool("root.tiny", 6) self._verify_query_num_for_resource_pool("root.large", 12) - self._verify_total_admitted_queries("root.small", 8) - self._verify_total_admitted_queries("root.tiny", 6) + self._verify_total_admitted_queries("root.small", 11) + self._verify_total_admitted_queries("root.tiny", 8) self._verify_total_admitted_queries("root.large", 16) @pytest.mark.execute_serially diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py index 7d039708a..56e852970 100644 --- a/tests/custom_cluster/test_query_retries.py +++ b/tests/custom_cluster/test_query_retries.py @@ -79,6 +79,11 @@ class TestQueryRetries(CustomClusterTestSuite): union all select count(*) from functional.alltypes where bool_col = sleep(50)""" + # A simple count query with predicate. The predicate is needed so that the planner does + # not create the optimized count(star) query plan. + _count_query = "select count(*) from tpch_parquet.lineitem where l_orderkey < 50" + _count_query_result = "55" + @classmethod def get_workload(cls): return 'functional-query' @@ -252,7 +257,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. killed_impalad = self.__kill_random_impalad() - query = "select count(*) from tpch_parquet.lineitem" + query = self._count_query handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true'}) self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60) @@ -264,7 +269,7 @@ class TestQueryRetries(CustomClusterTestSuite): results = self.client.fetch(query, handle) assert results.success assert len(results.data) == 1 - assert "6001215" in results.data[0] + assert self._count_query_result in results.data[0] # The runtime profile of the retried query. retried_runtime_profile = self.client.get_runtime_profile(handle) @@ -312,7 +317,7 @@ class TestQueryRetries(CustomClusterTestSuite): # and the query should be retried. Add delay before admission so that the 2nd node # is removed from the blacklist before scheduler makes schedule for the retried # query. - query = "select count(*) from tpch_parquet.lineitem" + query = self._count_query handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true', 'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'}) @@ -325,7 +330,7 @@ class TestQueryRetries(CustomClusterTestSuite): results = self.client.fetch(query, handle) assert results.success assert len(results.data) == 1 - assert "6001215" in results.data[0] + assert self._count_query_result in results.data[0] # The runtime profile of the retried query. retried_runtime_profile = self.client.get_runtime_profile(handle) @@ -375,7 +380,7 @@ class TestQueryRetries(CustomClusterTestSuite): rpc_not_accessible_impalad = self.cluster.impalads[1] assert rpc_not_accessible_impalad.service.krpc_port == FAILED_KRPC_PORT - query = "select count(*) from tpch_parquet.lineitem" + query = self._count_query handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true', 'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'}) @@ -698,7 +703,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() - query = "select count(*) from tpch_parquet.lineitem" + query = self._count_query handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true'}) self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60) @@ -737,7 +742,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() - query = "select count(*) from tpch_parquet.lineitem" + query = self._count_query handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true'}) self.__wait_until_retry_state(handle, 'RETRYING') @@ -767,7 +772,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() - query = "select count(*) from tpch_parquet.lineitem" + query = self._count_query handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true'}) @@ -791,7 +796,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() - query = "select count(*) from tpch_parquet.lineitem" + query = self._count_query self.hs2_client.set_configuration({'retry_failed_queries': 'true'}) self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024') self.hs2_client.execute_async(query) @@ -818,7 +823,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() - query = "select count(*) from tpch_parquet.lineitem" + query = self._count_query self.execute_query_async(query, query_options={'retry_failed_queries': 'true'}) # The number of in-flight queries is 0 at the beginning, then 1 when the original # query is submitted. It's 2 when the retried query is registered. Although the retry @@ -848,7 +853,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() - query = "select count(*) from tpch_parquet.lineitem" + query = self._count_query handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true', 'query_timeout_s': '1'}) self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60) @@ -887,7 +892,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() - query = "select count(*) from tpch_parquet.lineitem" + query = self._count_query client = self.cluster.get_first_impalad().service.create_beeswax_client() client.set_configuration({'retry_failed_queries': 'true'}) handle = client.execute_async(query) @@ -917,7 +922,7 @@ class TestQueryRetries(CustomClusterTestSuite): """Test query retries with the HS2 protocol. Enable the results set cache as well and test that query retries work with the results cache.""" self.cluster.impalads[1].kill() - query = "select count(*) from tpch_parquet.lineitem" + query = self._count_query self.hs2_client.set_configuration({'retry_failed_queries': 'true'}) self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024') handle = self.hs2_client.execute_async(query) @@ -926,7 +931,7 @@ class TestQueryRetries(CustomClusterTestSuite): results = self.hs2_client.fetch(query, handle) assert results.success assert len(results.data) == 1 - assert int(results.data[0]) == 6001215 + assert results.data[0] == self._count_query_result # Validate the live exec summary. retried_query_id = \