IMPALA-3718: Add test_cancellation tests for Kudu Additional functional tests for Kudu.
Change-Id: Icf3d3853e7075991f6d12f125407ebdbe6a287e2 Reviewed-on: http://gerrit.cloudera.org:8080/4700 Reviewed-by: Matthew Jacobs <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/8d7b01fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/8d7b01fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/8d7b01fa Branch: refs/heads/master Commit: 8d7b01faea6362af675a2a335b462fad3e0caa03 Parents: 8a49cea Author: Matthew Jacobs <[email protected]> Authored: Wed Sep 21 15:05:54 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Fri Oct 21 23:32:58 2016 +0000 ---------------------------------------------------------------------- tests/query_test/test_cancellation.py | 49 +++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8d7b01fa/tests/query_test/test_cancellation.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py index 265c781..91e81dc 100644 --- a/tests/query_test/test_cancellation.py +++ b/tests/query_test/test_cancellation.py @@ -27,13 +27,17 @@ from tests.common.test_vector import TestDimension from tests.common.impala_test_suite import ImpalaTestSuite from tests.verifiers.metric_verifier import MetricVerifier -# Queries to execute. Use the TPC-H dataset because tables are large so queries take some -# time to execute. -QUERIES = ['select l_returnflag from lineitem', - 'select count(l_returnflag) from lineitem', - 'select * from lineitem limit 50', - 'compute stats lineitem', - 'select * from lineitem order by l_orderkey'] +# PRIMARY KEY for lineitem +LINEITEM_PK = 'l_orderkey, l_partkey, l_suppkey, l_linenumber' + +# Queries to execute, mapped to a unique PRIMARY KEY for use in CTAS with Kudu. If None +# is specified for the PRIMARY KEY, it will not be used in a CTAS statement on Kudu. +# Use the TPC-H dataset because tables are large so queries take some time to execute. +QUERIES = {'select l_returnflag from lineitem' : None, + 'select count(l_returnflag) pk from lineitem' : 'pk', + 'select * from lineitem limit 50' : LINEITEM_PK, + 'compute stats lineitem' : None, + 'select * from lineitem order by l_orderkey' : LINEITEM_PK} QUERY_TYPE = ["SELECT", "CTAS"] @@ -59,19 +63,25 @@ class TestCancellation(ImpalaTestSuite): @classmethod def add_test_dimensions(cls): super(TestCancellation, cls).add_test_dimensions() - cls.TestMatrix.add_dimension(TestDimension('query', *QUERIES)) + cls.TestMatrix.add_dimension(TestDimension('query', *QUERIES.keys())) cls.TestMatrix.add_dimension(TestDimension('query_type', *QUERY_TYPE)) cls.TestMatrix.add_dimension(TestDimension('cancel_delay', *CANCEL_DELAY_IN_SECONDS)) cls.TestMatrix.add_dimension(TestDimension('action', *DEBUG_ACTIONS)) cls.TestMatrix.add_dimension(TestDimension('max_block_mgr_memory', 0)) cls.TestMatrix.add_constraint(lambda v: v.get_value('query_type') != 'CTAS' or (\ - v.get_value('table_format').file_format in ['text', 'parquet'] and\ + v.get_value('table_format').file_format in ['text', 'parquet', 'kudu'] and\ v.get_value('table_format').compression_codec == 'none')) cls.TestMatrix.add_constraint(lambda v: v.get_value('exec_option')['batch_size'] == 0) # Ignore 'compute stats' queries for the CTAS query type. cls.TestMatrix.add_constraint(lambda v: not (v.get_value('query_type') == 'CTAS' and v.get_value('query').startswith('compute stats'))) + + # Ignore CTAS on Kudu if there is no PRIMARY KEY specified. + cls.TestMatrix.add_constraint(lambda v: not (v.get_value('query_type') == 'CTAS' and + v.get_value('table_format').file_format == 'kudu' and + QUERIES[v.get_value('query')] is None)) + # tpch tables are not generated for hbase as the data loading takes a very long time. # TODO: Add cancellation tests for hbase. cls.TestMatrix.add_constraint(lambda v:\ @@ -87,15 +97,24 @@ class TestCancellation(ImpalaTestSuite): query_type = vector.get_value('query_type') if query_type == "CTAS": self.cleanup_test_table(vector.get_value('table_format')) - query = "create table ctas_cancel stored as %sfile as %s" %\ - (vector.get_value('table_format').file_format, query) + file_format = vector.get_value('table_format').file_format + if file_format == 'kudu': + assert QUERIES.has_key(query) and QUERIES[query] is not None,\ + "PRIMARY KEY for query %s not specified" % query + query = "create table ctas_cancel primary key (%s) "\ + "distribute by hash into 3 buckets stored as kudu as %s" %\ + (QUERIES[query], query) + else: + query = "create table ctas_cancel stored as %sfile as %s" %\ + (file_format, query) action = vector.get_value('action') # node ID 0 is the scan node debug_action = '0:GETNEXT:' + action if action != None else '' vector.get_value('exec_option')['debug_action'] = debug_action - vector.get_value('exec_option')['max_block_mgr_memory'] = vector.get_value('max_block_mgr_memory') + vector.get_value('exec_option')['max_block_mgr_memory'] =\ + vector.get_value('max_block_mgr_memory') # Execute the query multiple times, cancelling it each time. for i in xrange(NUM_CANCELATION_ITERATIONS): @@ -167,7 +186,8 @@ class TestCancellationSerial(TestCancellation): # Don't run across all cancel delay options unless running in exhaustive mode if cls.exploration_strategy() != 'exhaustive': cls.TestMatrix.add_constraint(lambda v: v.get_value('cancel_delay') in [3]) - cls.TestMatrix.add_constraint(lambda v: v.get_value('query') == choice(QUERIES)) + cls.TestMatrix.add_constraint(lambda v: v.get_value('query') ==\ + choice(QUERIES.keys())) @pytest.mark.execute_serially def test_cancel_insert(self, vector): @@ -186,7 +206,8 @@ class TestCancellationFullSort(TestCancellation): cls.TestMatrix.add_dimension(TestDimension('query', SORT_QUERY)) cls.TestMatrix.add_dimension(TestDimension('query_type', 'SELECT')) cls.TestMatrix.add_dimension(TestDimension('cancel_delay', *SORT_CANCEL_DELAY)) - cls.TestMatrix.add_dimension(TestDimension('max_block_mgr_memory', *SORT_BLOCK_MGR_LIMIT)) + cls.TestMatrix.add_dimension(TestDimension('max_block_mgr_memory',\ + *SORT_BLOCK_MGR_LIMIT)) cls.TestMatrix.add_dimension(TestDimension('action', None)) cls.TestMatrix.add_constraint(lambda v:\ v.get_value('table_format').file_format =='parquet' and\
