http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/tests/common/kudu_test_suite.py ---------------------------------------------------------------------- diff --git a/tests/common/kudu_test_suite.py b/tests/common/kudu_test_suite.py new file mode 100644 index 0000000..7a93c12 --- /dev/null +++ b/tests/common/kudu_test_suite.py @@ -0,0 +1,148 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import string +import pytest +from contextlib import contextmanager +from kudu.schema import ( + BOOL, + DOUBLE, + FLOAT, + INT16, + INT32, + INT64, + INT8, + SchemaBuilder, + STRING) +from kudu.client import Partitioning +from random import choice, sample +from string import ascii_lowercase, digits + +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.skip import SkipIf +from tests.common.test_dimensions import create_uncompressed_text_dimension + +class KuduTestSuite(ImpalaTestSuite): + + # Lazily set. + __DB_NAME = None + + @classmethod + def setup_class(cls): + if os.environ["KUDU_IS_SUPPORTED"] == "false": + pytest.skip("Kudu is not supported") + + super(KuduTestSuite, cls).setup_class() + + @classmethod + def get_workload(cls): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(KuduTestSuite, cls).add_test_dimensions() + cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload())) + + @classmethod + def auto_create_db(cls): + return True + + @classmethod + def get_db_name(cls): + # When py.test runs with the xdist plugin, several processes are started and each + # process runs some partition of the tests. It's possible that multiple processes + # will call this method. A random value is generated so the processes won't try + # to use the same database at the same time. The value is cached so within a single + # process the same database name is always used for the class. This doesn't need to + # be thread-safe since multi-threading is never used. + if not cls.__DB_NAME: + cls.__DB_NAME = \ + choice(ascii_lowercase) + "".join(sample(ascii_lowercase + digits, 5)) + return cls.__DB_NAME + + @classmethod + def random_table_name(cls): + return "".join(choice(string.lowercase) for _ in xrange(10)) + + @classmethod + def get_kudu_table_base_name(cls, name): + return name.split(".")[-1] + + @contextmanager + def temp_kudu_table(self, kudu, col_types, name=None, num_key_cols=1, col_names=None, + prepend_db_name=True, db_name=None): + """Create and return a table. This function should be used in a "with" context. + 'kudu' must be a kudu.client.Client. If a table name is not provided, a random + name will be used. If 'prepend_db_name' is True, the table name will be prepended + with (get_db_name() + "."). If column names are not provided, the letters + "a", "b", "c", ... will be used. + + Example: + with self.temp_kudu_table(kudu, [INT32]) as kudu_table: + assert kudu.table_exists(kudu_table.name) + assert not kudu.table_exists(kudu_table.name) + """ + if not col_names: + if len(col_types) > 26: + raise Exception("Too many columns for default naming") + col_names = [chr(97 + i) for i in xrange(len(col_types))] + schema_builder = SchemaBuilder() + for i, t in enumerate(col_types): + column_spec = schema_builder.add_column(col_names[i], type_=t) + if i < num_key_cols: + column_spec.nullable(False) + schema_builder.set_primary_keys(col_names[:num_key_cols]) + schema = schema_builder.build() + name = name or self.random_table_name() + if prepend_db_name: + name = (db_name or self.get_db_name().lower()) + "." + name + kudu.create_table(name, schema, + partitioning=Partitioning().add_hash_partitions(col_names[:num_key_cols], 2)) + try: + yield kudu.table(name) + finally: + if kudu.table_exists(name): + kudu.delete_table(name) + + @contextmanager + def drop_impala_table_after_context(self, cursor, table_name): + """For use in a "with" block: The named table will be dropped using the provided + cursor when the block exits. + + cursor.execute("CREATE TABLE foo ...") + with drop_impala_table_after_context(cursor, "foo"): + ... + # Now table foo no longer exists. + """ + try: + yield + finally: + cursor.execute("DROP TABLE %s" % table_name) + + def kudu_col_type_to_impala_col_type(self, col_type): + mapping = {BOOL: "BOOLEAN", + DOUBLE: "DOUBLE", + FLOAT: "FLOAT", + INT16: "SMALLINT", + INT32: "INT", + INT64: "BIGINT", + INT8: "TINYINT", + STRING: "STRING"} + if col_type not in mapping: + raise Exception("Unexpected column type: %s" % col_type) + return mapping[col_type]
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/tests/conftest.py ---------------------------------------------------------------------- diff --git a/tests/conftest.py b/tests/conftest.py index 3193c9e..c22de39 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,7 +27,7 @@ import logging import os import pytest -from common import KUDU_MASTER_HOST, KUDU_MASTER_PORT +from common import KUDU_MASTER_HOSTS from common.test_result_verifier import QueryTestResult from tests.common.patterns import is_valid_impala_identifier from tests.util.filesystem_utils import FILESYSTEM, ISILON_WEBHDFS_PORT @@ -288,7 +288,13 @@ def kudu_client(): """Provides a new Kudu client as a pytest fixture. The client only exists for the duration of the method it is used in. """ - kudu_client = kudu_connect(KUDU_MASTER_HOST, KUDU_MASTER_PORT) + if "," in KUDU_MASTER_HOSTS: + raise Exception("Multi-master not supported yet") + if ":" in KUDU_MASTER_HOSTS: + host, port = KUDU_MASTER_HOSTS.split(":") + else: + host, port = KUDU_MASTER_HOSTS, 7051 + kudu_client = kudu_connect(host, port) try: yield kudu_client finally: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/tests/custom_cluster/test_kudu.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_kudu.py b/tests/custom_cluster/test_kudu.py new file mode 100644 index 0000000..898a29e --- /dev/null +++ b/tests/custom_cluster/test_kudu.py @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import pytest +from kudu.schema import INT32 + +from tests.common import KUDU_MASTER_HOSTS +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.kudu_test_suite import KuduTestSuite + +LOG = logging.getLogger(__name__) + +class TestKuduOperations(CustomClusterTestSuite, KuduTestSuite): + + @classmethod + def get_workload(cls): + return 'functional-query' + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(impalad_args="-kudu_master_hosts=") + def test_kudu_master_hosts(self, cursor, kudu_client): + """Check behavior when -kudu_master_hosts is not provided to catalogd.""" + with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table: + table_name = self.get_kudu_table_base_name(kudu_table.name) + props = "TBLPROPERTIES('kudu.table_name'='%s')" % (kudu_table.name) + try: + cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (table_name, + props)) + assert False + except Exception as e: + assert "Table property 'kudu.master_addresses' is required" in str(e) + + cursor.execute(""" + CREATE EXTERNAL TABLE %s STORED AS KUDU + TBLPROPERTIES ('kudu.master_addresses' = '%s', + 'kudu.table_name'='%s') + """ % (table_name, KUDU_MASTER_HOSTS, kudu_table.name)) + cursor.execute("DROP TABLE %s" % table_name) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/tests/metadata/test_ddl.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py index 18ed1af..8079855 100644 --- a/tests/metadata/test_ddl.py +++ b/tests/metadata/test_ddl.py @@ -220,9 +220,8 @@ class TestDdlStatements(TestDdlBase): @SkipIf.kudu_not_supported @UniqueDatabase.parametrize(sync_ddl=True) def test_create_kudu(self, vector, unique_database): - self.expected_exceptions = 2 vector.get_value('exec_option')['abort_on_error'] = False - self.run_test_case('QueryTest/create_kudu', vector, use_db=unique_database, + self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) @UniqueDatabase.parametrize(sync_ddl=True) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/tests/metadata/test_show_create_table.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_show_create_table.py b/tests/metadata/test_show_create_table.py index 717223a..aae9f0c 100644 --- a/tests/metadata/test_show_create_table.py +++ b/tests/metadata/test_show_create_table.py @@ -54,11 +54,6 @@ class TestShowCreateTable(ImpalaTestSuite): self.__run_show_create_table_test_case('QueryTest/show-create-table', vector, unique_database) - @SkipIf.kudu_not_supported - def test_kudu_show_create_table(self, vector, unique_database): - self.__run_show_create_table_test_case('QueryTest/kudu-show-create', vector, - unique_database) - def __run_show_create_table_test_case(self, test_file_name, vector, unique_db_name): """ Runs a show-create-table test file, containing the following sections: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/tests/query_test/test_kudu.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py index d791608..c22de3e 100644 --- a/tests/query_test/test_kudu.py +++ b/tests/query_test/test_kudu.py @@ -15,104 +15,427 @@ # specific language governing permissions and limitations # under the License. +from kudu.schema import ( + BOOL, + DOUBLE, + FLOAT, + INT16, + INT32, + INT64, + INT8, + STRING) +import logging import pytest -from copy import copy +import textwrap -from tests.beeswax.impala_beeswax import ImpalaBeeswaxException -from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIf -from tests.common.test_dimensions import create_uncompressed_text_dimension -from tests.common.test_vector import TestDimension +from tests.common import KUDU_MASTER_HOSTS +from tests.common.kudu_test_suite import KuduTestSuite +LOG = logging.getLogger(__name__) [email protected]_not_supported -class TestKuduOperations(ImpalaTestSuite): +class TestKuduOperations(KuduTestSuite): """ This suite tests the different modification operations when using a kudu table. """ - @classmethod - def file_format_constraint(cls, v): - return v.get_value('table_format').file_format in ["parquet"] + def test_kudu_scan_node(self, vector, unique_database): + self.run_test_case('QueryTest/kudu-scan-node', vector, use_db=unique_database, + wait_secs_between_stmts=1) - @classmethod - def get_workload(cls): - return 'functional-query' + def test_kudu_crud(self, vector, unique_database): + self.run_test_case('QueryTest/kudu_crud', vector, use_db=unique_database, + wait_secs_between_stmts=1) - @classmethod - def add_test_dimensions(cls): - super(TestKuduOperations, cls).add_test_dimensions() - cls.TestMatrix.add_constraint(cls.file_format_constraint) - - # TODO(kudu-merge) IMPALA-3178 DROP DATABASE ... CASCADE is broken in Kudu so we need - # to clean up table-by-table. Once solved, delete this and rely on the overriden method. - def cleanup_db(self, db_name): - self.client.execute("use default") - self.client.set_configuration({'sync_ddl': True}) - if db_name + "\t" in self.client.execute("show databases", ).data: - # We use quoted identifiers to avoid name clashes with keywords - for tbl_name in self.client.execute("show tables in `" + db_name + "`").data: - full_tbl_name = '`%s`.`%s`' % (db_name, tbl_name) - result = self.client.execute("describe formatted " + full_tbl_name) - if 'VIRTUAL_VIEW' in '\n'.join(result.data): - self.client.execute("drop view " + full_tbl_name) - else: - self.client.execute("drop table " + full_tbl_name) - for fn_result in self.client.execute("show functions in `" + db_name + "`").data: - # First column is the return type, second is the function signature - fn_name = fn_result.split('\t')[1] - self.client.execute("drop function `%s`.%s" % (db_name, fn_name)) - for fn_result in self.client.execute(\ - "show aggregate functions in `" + db_name + "`").data: - fn_name = fn_result.split('\t')[1] - self.client.execute("drop function `%s`.%s" % (db_name, fn_name)) - self.client.execute("drop database `" + db_name + "`") - - def setup_method(self, method): - self.cleanup_db("kududb_test") - self.client.execute("create database kududb_test") - - def teardown_method(self, method): - self.cleanup_db("kududb_test") + def test_kudu_partition_ddl(self, vector, unique_database): + self.run_test_case('QueryTest/kudu_partition_ddl', vector, use_db=unique_database) - @pytest.mark.execute_serially - def test_kudu_scan_node(self, vector): - self.run_test_case('QueryTest/kudu-scan-node', vector, use_db="functional_kudu", - wait_secs_between_stmts=1) + def test_kudu_alter_table(self, vector, unique_database): + self.run_test_case('QueryTest/kudu_alter', vector, use_db=unique_database) - @pytest.mark.execute_serially - def test_insert_update_delete(self, vector): - self.run_test_case('QueryTest/kudu_crud', vector, use_db="kududb_test", - wait_secs_between_stmts=1) + def test_kudu_stats(self, vector, unique_database): + self.run_test_case('QueryTest/kudu_stats', vector, use_db=unique_database) - @pytest.mark.execute_serially - def test_kudu_partition_ddl(self, vector): - self.run_test_case('QueryTest/kudu_partition_ddl', vector, use_db="kududb_test") - @pytest.mark.execute_serially - def test_kudu_alter_table(self, vector): - self.run_test_case('QueryTest/kudu_alter', vector, use_db="kududb_test") +class TestCreateExternalTable(KuduTestSuite): - @pytest.mark.execute_serially - def test_kudu_stats(self, vector): - self.run_test_case('QueryTest/kudu_stats', vector, use_db="kududb_test") + def test_implicit_table_props(self, cursor, kudu_client): + """Check that table properties added internally during table creation are as + expected. + """ + with self.temp_kudu_table(kudu_client, [STRING, INT8, BOOL], num_key_cols=2) \ + as kudu_table: + impala_table_name = self.get_kudu_table_base_name(kudu_table.name) + props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name + cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name, + props)) + with self.drop_impala_table_after_context(cursor, impala_table_name): + cursor.execute("DESCRIBE FORMATTED %s" % impala_table_name) + table_desc = [[col.strip() if col else col for col in row] for row in cursor] + LOG.info(table_desc) + # Pytest shows truncated output on failure, so print the details just in case. + assert ["", "EXTERNAL", "TRUE"] in table_desc + assert ["", "kudu.master_addresses", KUDU_MASTER_HOSTS] in table_desc + assert ["", "kudu.table_name", kudu_table.name] in table_desc + assert ["", "storage_handler", "com.cloudera.kudu.hive.KuduStorageHandler"] \ + in table_desc + + def test_col_types(self, cursor, kudu_client): + """Check that a table can be created using all available column types.""" + # TODO: The python Kudu client doesn't yet support TIMESTAMP or BYTE[], those should + # be tested for graceful failure. + kudu_types = [STRING, BOOL, DOUBLE, FLOAT, INT16, INT32, INT64, INT8] + with self.temp_kudu_table(kudu_client, kudu_types) as kudu_table: + impala_table_name = self.get_kudu_table_base_name(kudu_table.name) + props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name + cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name, + props)) + with self.drop_impala_table_after_context(cursor, impala_table_name): + cursor.execute("DESCRIBE %s" % impala_table_name) + kudu_schema = kudu_table.schema + for i, (col_name, col_type, _) in enumerate(cursor): + kudu_col = kudu_schema[i] + assert col_name == kudu_col.name + assert col_type.upper() == \ + self.kudu_col_type_to_impala_col_type(kudu_col.type.type) + + def test_drop_external_table(self, cursor, kudu_client): + """Check that dropping an external table only affects the catalog and does not delete + the table in Kudu. + """ + with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table: + impala_table_name = self.get_kudu_table_base_name(kudu_table.name) + props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name + cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name, + props)) + with self.drop_impala_table_after_context(cursor, impala_table_name): + cursor.execute("SELECT COUNT(*) FROM %s" % impala_table_name) + assert cursor.fetchall() == [(0, )] + try: + cursor.execute("SELECT COUNT(*) FROM %s" % impala_table_name) + assert False + except Exception as e: + assert "Could not resolve table reference" in str(e) + assert kudu_client.table_exists(kudu_table.name) + + def test_explicit_name(self, cursor, kudu_client): + """Check that a Kudu table can be specified using a table property.""" + with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table: + table_name = self.random_table_name() + cursor.execute(""" + CREATE EXTERNAL TABLE %s + STORED AS KUDU + TBLPROPERTIES('kudu.table_name' = '%s')""" % (table_name, kudu_table.name)) + with self.drop_impala_table_after_context(cursor, table_name): + cursor.execute("SELECT * FROM %s" % table_name) + assert len(cursor.fetchall()) == 0 + + def test_explicit_name_preference(self, cursor, kudu_client): + """Check that the table name from a table property is used when a table of the + implied name also exists. + """ + with self.temp_kudu_table(kudu_client, [INT64]) as preferred_kudu_table: + with self.temp_kudu_table(kudu_client, [INT8]) as other_kudu_table: + impala_table_name = self.get_kudu_table_base_name(other_kudu_table.name) + cursor.execute(""" + CREATE EXTERNAL TABLE %s + STORED AS KUDU + TBLPROPERTIES('kudu.table_name' = '%s')""" % ( + impala_table_name, preferred_kudu_table.name)) + with self.drop_impala_table_after_context(cursor, impala_table_name): + cursor.execute("DESCRIBE %s" % impala_table_name) + assert cursor.fetchall() == [("a", "bigint", "")] + + def test_explicit_name_doesnt_exist(self, cursor, kudu_client): + kudu_table_name = self.random_table_name() + try: + cursor.execute(""" + CREATE EXTERNAL TABLE %s + STORED AS KUDU + TBLPROPERTIES('kudu.table_name' = '%s')""" % ( + self.random_table_name(), kudu_table_name)) + except Exception as e: + assert "Table does not exist in Kudu: '%s'" % kudu_table_name in str(e) + + def test_explicit_name_doesnt_exist_but_implicit_does(self, cursor, kudu_client): + """Check that when an explicit table name is given but that table doesn't exist, + there is no fall-through to an existing implicit table. + """ + with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table: + table_name = self.random_table_name() + try: + cursor.execute(""" + CREATE EXTERNAL TABLE %s + STORED AS KUDU + TBLPROPERTIES('kudu.table_name' = '%s')""" % ( + self.get_kudu_table_base_name(kudu_table.name), table_name)) + except Exception as e: + assert "Table does not exist in Kudu: '%s'" % table_name in str(e) + + +class TestShowCreateTable(KuduTestSuite): + + def assert_show_create_equals(self, cursor, create_sql, show_create_sql): + """Executes 'create_sql' to create a table, then runs "SHOW CREATE TABLE" and checks + that the output is the same as 'show_create_sql'. 'create_sql' and + 'show_create_sql' can be templates that can be used with str.format(). format() + will be called with 'table' and 'db' as keyword args. + """ + format_args = {"table": self.random_table_name(), "db": cursor.conn.db_name} + cursor.execute(create_sql.format(**format_args)) + cursor.execute("SHOW CREATE TABLE {table}".format(**format_args)) + assert cursor.fetchall()[0][0] == \ + textwrap.dedent(show_create_sql.format(**format_args)).strip() + + def test_primary_key_and_distribution(self, cursor): + # TODO: Add test cases with column comments once KUDU-1711 is fixed. + self.assert_show_create_equals(cursor, + """ + CREATE TABLE {table} (c INT PRIMARY KEY) + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS STORED AS KUDU""", + """ + CREATE TABLE {db}.{{table}} ( + c INT, + PRIMARY KEY (c) + ) + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS + STORED AS KUDU + TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( + db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) + self.assert_show_create_equals(cursor, + """ + CREATE TABLE {table} (c INT PRIMARY KEY, d STRING) + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, RANGE (c) SPLIT ROWS ((1), (2)) + STORED AS KUDU""", + """ + CREATE TABLE {db}.{{table}} ( + c INT, + d STRING, + PRIMARY KEY (c) + ) + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, RANGE (c) SPLIT ROWS (...) + STORED AS KUDU + TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( + db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) + self.assert_show_create_equals(cursor, + """ + CREATE TABLE {table} (c INT, PRIMARY KEY (c)) + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS STORED AS KUDU""", + """ + CREATE TABLE {db}.{{table}} ( + c INT, + PRIMARY KEY (c) + ) + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS + STORED AS KUDU + TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( + db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) + self.assert_show_create_equals(cursor, + """ + CREATE TABLE {table} (c INT, d STRING, PRIMARY KEY(c, d)) + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, HASH (d) INTO 3 BUCKETS, + RANGE (c, d) SPLIT ROWS ((1, 'aaa'), (2, 'bbb')) STORED AS KUDU""", + """ + CREATE TABLE {db}.{{table}} ( + c INT, + d STRING, + PRIMARY KEY (c, d) + ) + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, HASH (d) INTO 3 BUCKETS, RANGE (c, d) SPLIT ROWS (...) + STORED AS KUDU + TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( + db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) + self.assert_show_create_equals(cursor, + """ + CREATE TABLE {table} (c INT, d STRING, e INT, PRIMARY KEY(c, d)) + DISTRIBUTE BY RANGE (c) SPLIT ROWS ((1), (2), (3)) STORED AS KUDU""", + """ + CREATE TABLE {db}.{{table}} ( + c INT, + d STRING, + e INT, + PRIMARY KEY (c, d) + ) + DISTRIBUTE BY RANGE (c) SPLIT ROWS (...) + STORED AS KUDU + TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( + db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) + + def test_properties(self, cursor): + # If an explicit table name is used for the Kudu table and it differs from what + # would be the default Kudu table name, the name should be shown as a table property. + kudu_table = self.random_table_name() + props = "'kudu.table_name'='%s'" % kudu_table + self.assert_show_create_equals(cursor, + """ + CREATE TABLE {{table}} (c INT PRIMARY KEY) + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS + STORED AS KUDU + TBLPROPERTIES ({props})""".format(props=props), + """ + CREATE TABLE {db}.{{table}} ( + c INT, + PRIMARY KEY (c) + ) + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS + STORED AS KUDU + TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}', {props})""".format( + db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS, props=props)) + # If the name is explicitly given (or not given at all) so that the name is the same + # as the default name, the table name is not shown. + props = "'kudu.table_name'='impala::{db}.{table}'" + self.assert_show_create_equals(cursor, + """ + CREATE TABLE {{table}} (c INT PRIMARY KEY) + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS + STORED AS KUDU + TBLPROPERTIES ({props})""".format(props=props), + """ + CREATE TABLE {db}.{{table}} ( + c INT, + PRIMARY KEY (c) + ) + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS + STORED AS KUDU + TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( + db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) [email protected]_not_supported -class TestKuduMemLimits(ImpalaTestSuite): - QUERIES = ["select * from kudu_mem_limit.lineitem where l_orderkey = -1", - "select * from kudu_mem_limit.lineitem where l_commitdate like '%cheese'", - "select * from kudu_mem_limit.lineitem limit 90"] + +class TestDropDb(KuduTestSuite): + + def test_drop_non_empty_db(self, unique_cursor, kudu_client): + """Check that an attempt to drop a database will fail if Kudu tables are present + and that the tables remain. + """ + db_name = unique_cursor.conn.db_name + with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table: + impala_table_name = self.get_kudu_table_base_name(kudu_table.name) + props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name + unique_cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % ( + impala_table_name, props)) + unique_cursor.execute("USE DEFAULT") + try: + unique_cursor.execute("DROP DATABASE %s" % db_name) + assert False + except Exception as e: + assert "One or more tables exist" in str(e) + unique_cursor.execute("SELECT COUNT(*) FROM %s.%s" % (db_name, impala_table_name)) + assert unique_cursor.fetchall() == [(0, )] + + def test_drop_db_cascade(self, unique_cursor, kudu_client): + """Check that an attempt to drop a database will succeed even if Kudu tables are + present and that the managed tables are removed. + """ + db_name = unique_cursor.conn.db_name + with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table: + # Create an external Kudu table + impala_table_name = self.get_kudu_table_base_name(kudu_table.name) + props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name + unique_cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % ( + impala_table_name, props)) + + # Create a managed Kudu table + managed_table_name = self.random_table_name() + unique_cursor.execute(""" + CREATE TABLE %s (a INT PRIMARY KEY) DISTRIBUTE BY HASH (a) INTO 3 BUCKETS + STORED AS KUDU TBLPROPERTIES ('kudu.table_name' = '%s')""" + % (managed_table_name, managed_table_name)) + assert kudu_client.table_exists(managed_table_name) + + # Create a table in HDFS + hdfs_table_name = self.random_table_name() + unique_cursor.execute(""" + CREATE TABLE %s (a INT) PARTITIONED BY (x INT)""" % (hdfs_table_name)) + + unique_cursor.execute("USE DEFAULT") + unique_cursor.execute("DROP DATABASE %s CASCADE" % db_name) + unique_cursor.execute("SHOW DATABASES") + assert db_name not in unique_cursor.fetchall() + assert kudu_client.table_exists(kudu_table.name) + assert not kudu_client.table_exists(managed_table_name) + +class TestImpalaKuduIntegration(KuduTestSuite): + def test_replace_kudu_table(self, cursor, kudu_client): + """Check that an external Kudu table is accessible if the underlying Kudu table is + modified using the Kudu client. + """ + # Create an external Kudu table + col_names = ['a'] + with self.temp_kudu_table(kudu_client, [INT32], col_names=col_names) as kudu_table: + impala_table_name = self.get_kudu_table_base_name(kudu_table.name) + props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name + cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % ( + impala_table_name, props)) + cursor.execute("DESCRIBE %s" % (impala_table_name)) + assert cursor.fetchall() == [("a", "int", "")] + + # Drop the underlying Kudu table and replace it with another Kudu table that has + # the same name but different schema + kudu_client.delete_table(kudu_table.name) + assert not kudu_client.table_exists(kudu_table.name) + new_col_names = ['b', 'c'] + name_parts = kudu_table.name.split(".") + assert len(name_parts) == 2 + with self.temp_kudu_table(kudu_client, [STRING, STRING], col_names=new_col_names, + db_name=name_parts[0], name= name_parts[1]) as new_kudu_table: + assert kudu_client.table_exists(new_kudu_table.name) + # Refresh the external table and verify that the new schema is loaded from + # Kudu. + cursor.execute("REFRESH %s" % (impala_table_name)) + cursor.execute("DESCRIBE %s" % (impala_table_name)) + assert cursor.fetchall() == [("b", "string", ""), ("c", "string", "")] + + def test_delete_external_kudu_table(self, cursor, kudu_client): + """Check that Impala can recover from the case where the underlying Kudu table of + an external table is dropped using the Kudu client. The external table can be + dropped using DROP TABLE IF EXISTS statement. + """ + with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table: + # Create an external Kudu table + impala_table_name = self.get_kudu_table_base_name(kudu_table.name) + props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name + cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % ( + impala_table_name, props)) + cursor.execute("DESCRIBE %s" % (impala_table_name)) + assert cursor.fetchall() == [("a", "int", "")] + # Drop the underlying Kudu table + kudu_client.delete_table(kudu_table.name) + assert not kudu_client.table_exists(kudu_table.name) + err_msg = 'The table does not exist: table_name: "%s"' % (kudu_table.name) + try: + cursor.execute("REFRESH %s" % (impala_table_name)) + except Exception as e: + assert err_msg in str(e) + cursor.execute("DROP TABLE IF EXISTS %s" % (impala_table_name)) + cursor.execute("SHOW TABLES") + assert impala_table_name not in cursor.fetchall() + + def test_delete_managed_kudu_table(self, cursor, kudu_client, unique_database): + """Check that dropping a managed Kudu table works even if the underlying Kudu table + has been dropped externally.""" + impala_tbl_name = "foo" + cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY) DISTRIBUTE BY HASH (a) + INTO 3 BUCKETS STORED AS KUDU""" % (unique_database, impala_tbl_name)) + kudu_tbl_name = "impala::%s.%s" % (unique_database, impala_tbl_name) + assert kudu_client.table_exists(kudu_tbl_name) + kudu_client.delete_table(kudu_tbl_name) + assert not kudu_client.table_exists(kudu_tbl_name) + cursor.execute("DROP TABLE IF EXISTS %s" % (impala_tbl_name)) + cursor.execute("SHOW TABLES") + assert impala_tbl_name not in cursor.fetchall() + +class TestKuduMemLimits(KuduTestSuite): + + QUERIES = ["select * from lineitem where l_orderkey = -1", + "select * from lineitem where l_commitdate like '%cheese'", + "select * from lineitem limit 90"] # The value indicates the minimum memory requirements for the queries above, the first # memory limit corresponds to the first query QUERY_MEM_LIMITS = [1, 1, 10] - # The values from this array are used as a mem_limit test dimension - TEST_LIMITS = [1, 10, 0] - CREATE = """ - CREATE TABLE kudu_mem_limit.lineitem ( + CREATE TABLE lineitem ( l_orderkey BIGINT, l_linenumber INT, l_partkey BIGINT, @@ -128,88 +451,41 @@ class TestKuduMemLimits(ImpalaTestSuite): l_receiptdate STRING, l_shipinstruct STRING, l_shipmode STRING, - l_comment STRING - ) - DISTRIBUTE BY HASH (l_orderkey) INTO 9 BUCKETS - TBLPROPERTIES( - 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', - 'kudu.table_name' = 'tpch_lineitem', - 'kudu.master_addresses' = '127.0.0.1', - 'kudu.key_columns' = 'l_orderkey,l_linenumber' - ) - """ + l_comment STRING, + PRIMARY KEY (l_orderkey, l_linenumber)) + DISTRIBUTE BY HASH (l_orderkey, l_linenumber) INTO 3 BUCKETS + STORED AS KUDU""" LOAD = """ - insert into kudu_mem_limit.lineitem + insert into lineitem select l_orderkey, l_linenumber, l_partkey, l_suppkey, cast(l_quantity as double), cast(l_extendedprice as double), cast(l_discount as double), cast(l_tax as double), l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, - l_shipmode, l_comment from tpch_parquet.lineitem; - """ - - @classmethod - def get_workload(cls): - return 'functional-query' + l_shipmode, l_comment from tpch_parquet.lineitem""" @classmethod - def add_test_dimensions(cls): - super(TestKuduMemLimits, cls).add_test_dimensions() + def auto_create_db(cls): + return True - # add mem_limit as a test dimension. - new_dimension = TestDimension('mem_limit', *TestKuduMemLimits.TEST_LIMITS) - cls.TestMatrix.add_dimension(new_dimension) - cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload())) - - @classmethod - def setup_class(cls): - super(TestKuduMemLimits, cls).setup_class() - cls.cleanup_db("kudu_mem_limit") - cls.client.execute("create database kudu_mem_limit") - cls.client.execute(cls.CREATE) - cls.client.execute(cls.LOAD) - - @classmethod - def teardown_class(cls): - cls.cleanup_db("kudu_mem_limit") - super(TestKuduMemLimits, cls).teardown_class() - - # TODO(kudu-merge) IMPALA-3178 DROP DATABASE ... CASCADE is broken in Kudu so we need - # to clean up table-by-table. Once solved, delete this and rely on the overriden method. - @classmethod - def cleanup_db(cls, db_name): - cls.client.execute("use default") - cls.client.set_configuration({'sync_ddl': True}) - if db_name + "\t" in cls.client.execute("show databases", ).data: - # We use quoted identifiers to avoid name clashes with keywords - for tbl_name in cls.client.execute("show tables in `" + db_name + "`").data: - full_tbl_name = '`%s`.`%s`' % (db_name, tbl_name) - result = cls.client.execute("describe formatted " + full_tbl_name) - if 'VIRTUAL_VIEW' in '\n'.join(result.data): - cls.client.execute("drop view " + full_tbl_name) - else: - cls.client.execute("drop table " + full_tbl_name) - for fn_result in cls.client.execute("show functions in `" + db_name + "`").data: - # First column is the return type, second is the function signature - fn_name = fn_result.split('\t')[1] - cls.client.execute("drop function `%s`.%s" % (db_name, fn_name)) - for fn_result in cls.client.execute(\ - "show aggregate functions in `" + db_name + "`").data: - fn_name = fn_result.split('\t')[1] - cls.client.execute("drop function `%s`.%s" % (db_name, fn_name)) - cls.client.execute("drop database `" + db_name + "`") + @pytest.fixture(scope='class') + def test_data(cls, cls_cursor): + cls_cursor.execute(cls.CREATE) + cls_cursor.execute(cls.LOAD) @pytest.mark.execute_serially - def test_low_mem_limit_low_selectivity_scan(self, vector): + @pytest.mark.usefixtures("test_data") + @pytest.mark.parametrize("mem_limit", [1, 10, 0]) + def test_low_mem_limit_low_selectivity_scan(self, cursor, mem_limit, vector): """Tests that the queries specified in this test suite run under the given memory limits.""" - mem_limit = copy(vector.get_value('mem_limit')) - exec_options = copy(vector.get_value('exec_option')) + exec_options = dict((k, str(v)) for k, v + in vector.get_value('exec_option').iteritems()) exec_options['mem_limit'] = "{0}m".format(mem_limit) for i, q in enumerate(self.QUERIES): try: - self.execute_query(q, exec_options) - pass - except ImpalaBeeswaxException as e: + cursor.execute(q, configuration=exec_options) + cursor.fetchall() + except Exception as e: if (mem_limit > self.QUERY_MEM_LIMITS[i]): raise assert "Memory limit exceeded" in str(e)
