Repository: incubator-impala Updated Branches: refs/heads/master 7c0018881 -> 58d8861c5
IMPALA-4338: test infra data migrator: include tables' primary keys in PostgreSQL This patch adds the ability for the test infrastructure's Impala-to-PostgreSQL data migration tool to recognize whether the Impala source tables have primary keys, and if so, CREATE the tables in PostgreSQL with the same primary keys. This is needed especially for performing CRUD operations by the random query generator for comparison with Impala/Kudu tables and equivalent PostgreSQL tables. I modified the make_create_table_sql() implementation to check the "universal" Python object model of the table's columns. We generate CREATE TABLE statements with, or without, a PRIMARY KEY clause. For Impala-side tables that this tool may create, we also ensure that we only write such a clause when the table's format supports primary keys (currently Kudu). When the random query generator runs, it needs to know that the tables it's examining in both databases are equivalent. It does this by examining the tables' names, column names, and column types. I have added whether the column is a primary key as part of this equivalence test. Testing: - The patch includes some unit and system tests for the tool. - Actually migrated a few small Kudu and HDFS tables from Impala into both PostgreSQL 9.3 and 9.5 and examined the tables in PostgreSQL to make sure they had primary keys (or not) as expected. - Very short discrepancy_searcher.py --explain-only runs to test positive and negative cases of Impala/PostgreSQL equivalency. Change-Id: I447f022e2dc3d4fc8373b7f388c7875a869921b8 Reviewed-on: http://gerrit.cloudera.org:8080/4951 Reviewed-by: Taras Bobrovytsky <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/822b2ca1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/822b2ca1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/822b2ca1 Branch: refs/heads/master Commit: 822b2ca1c6a9bb09328857b59851035260be851f Parents: 7c00188 Author: Michael Brown <[email protected]> Authored: Tue Nov 1 17:54:14 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Sun Nov 6 06:50:48 2016 +0000 ---------------------------------------------------------------------- tests/comparison/db_connection.py | 88 ++++++++++++-- tests/comparison/tests/fake_query.py | 9 +- tests/comparison/tests/test_cursor.py | 184 +++++++++++++++++++++++++++++ 3 files changed, 271 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/822b2ca1/tests/comparison/db_connection.py ---------------------------------------------------------------------- diff --git a/tests/comparison/db_connection.py b/tests/comparison/db_connection.py index 5c51ff3..8e35031 100644 --- a/tests/comparison/db_connection.py +++ b/tests/comparison/db_connection.py @@ -41,6 +41,7 @@ from pyparsing import ( Suppress, Word) from tempfile import gettempdir +from textwrap import dedent from threading import Lock from time import time @@ -113,6 +114,14 @@ class DbCursor(object): ' It has a different number of columns across databases.', table_name) mismatch = True break + if common_table.primary_key_names != table.primary_key_names: + LOG.debug( + 'Ignoring table {name} because of differing primary keys: ' + '{common_table_keys} vs. {table_keys}'.format( + name=table_name, common_table_keys=common_table.primary_key_names, + table_keys=table.primary_key_names)) + mismatch = True + break for left, right in izip(common_table.cols, table.cols): if not left.name == right.name and left.type == right.type: LOG.debug('Ignoring table %s. It has different columns %s vs %s.' % @@ -438,13 +447,34 @@ class DbCursor(object): LOG.debug('Created table %s', table.name) def make_create_table_sql(self, table): - sql = 'CREATE TABLE %s (%s)' % ( - table.name, - ', '.join('%s %s' % - (col.name, self.get_sql_for_data_type(col.exact_type)) + - ('' if self.conn.data_types_are_implictly_nullable else ' NULL') - for col in table.cols)) - return sql + column_declarations = [] + primary_key_names = [] + for col in table.cols: + if col.is_primary_key: + null_constraint = '' + primary_key_names.append(col.name) + elif self.conn.data_types_are_implictly_nullable: + null_constraint = '' + else: + null_constraint = ' NULL' + column_declaration = '{name} {col_type}{null_constraint}'.format( + name=col.name, col_type=self.get_sql_for_data_type(col.exact_type), + null_constraint=null_constraint) + column_declarations.append(column_declaration) + + if primary_key_names: + primary_key_constraint = ', PRIMARY KEY ({keys})'.format( + keys=', '.join(primary_key_names)) + else: + primary_key_constraint = '' + + create_table = ('CREATE TABLE {table_name} (' + '{all_columns}' + '{primary_key_constraint}' + ')'.format( + table_name=table.name, all_columns=', '.join(column_declarations), + primary_key_constraint=primary_key_constraint)) + return create_table def get_sql_for_data_type(self, data_type): if issubclass(data_type, VarChar): @@ -655,6 +685,7 @@ class DbConnection(object): class ImpalaCursor(DbCursor): PK_SEARCH_PATTERN = re.compile('PRIMARY KEY \((?P<keys>.*?)\)') + STORAGE_FORMATS_WITH_PRIMARY_KEYS = ('KUDU',) @classmethod def make_insert_sql_from_data(cls, table, rows): @@ -757,6 +788,28 @@ class ImpalaCursor(DbCursor): def make_create_table_sql(self, table): sql = super(ImpalaCursor, self).make_create_table_sql(table) + + if table.primary_keys: + if table.storage_format in ImpalaCursor.STORAGE_FORMATS_WITH_PRIMARY_KEYS: + # IMPALA-4424 adds support for parametrizing the partitions; for now, on our + # small scale, this is ok, especially since the model is to migrate tables from + # Impala into Postgres anyway. 3 was chosen for the buckets because our + # minicluster tends to have 3 tablet servers, but otherwise it's arbitrary and + # provides valid syntax for creating Kudu tables in Impala. + sql += '\nDISTRIBUTE BY HASH ({col}) INTO 3 BUCKETS'.format( + col=table.primary_key_names[0]) + else: + raise Exception( + 'table representation has primary keys {keys} but is not in a format that ' + 'supports them: {storage_format}'.format( + keys=str(table.primary_key_names), + storage_format=table.storage_format)) + elif table.storage_format in ImpalaCursor.STORAGE_FORMATS_WITH_PRIMARY_KEYS: + raise Exception( + 'table representation has storage format {storage_format} ' + 'but does not have any primary keys'.format( + storage_format=table.storage_format)) + if table.storage_format != 'TEXTFILE': sql += "\nSTORED AS " + table.storage_format if table.storage_location: @@ -947,6 +1000,27 @@ class PostgresqlCursor(DbCursor): return 'VARCHAR(%s)' % String.MAX return super(PostgresqlCursor, self).get_sql_for_data_type(data_type) + def _fetch_primary_key_names(self, table_name): + # see: + # https://www.postgresql.org/docs/9.5/static/infoschema-key-column-usage.html + # https://www.postgresql.org/docs/9.5/static/infoschema-table-constraints.html + sql = dedent(''' + SELECT + key_cols.column_name AS column_name + FROM + information_schema.key_column_usage key_cols, + information_schema.table_constraints table_constraints + WHERE + key_cols.constraint_catalog = table_constraints.constraint_catalog AND + key_cols.table_name = table_constraints.table_name AND + key_cols.constraint_name = table_constraints.constraint_name AND + table_constraints.constraint_type = 'PRIMARY KEY' AND + key_cols.table_name = '{table_name}' + ORDER BY key_cols.ordinal_position'''.format(table_name=table_name)) + self.execute(sql) + rows = self.fetchall() + return tuple(row[0] for row in rows) + class PostgresqlConnection(DbConnection): http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/822b2ca1/tests/comparison/tests/fake_query.py ---------------------------------------------------------------------- diff --git a/tests/comparison/tests/fake_query.py b/tests/comparison/tests/fake_query.py index 1241b73..3bf1410 100644 --- a/tests/comparison/tests/fake_query.py +++ b/tests/comparison/tests/fake_query.py @@ -37,17 +37,19 @@ from tests.comparison.funcs import AnalyticFirstValue from tests.comparison.query import Query, SelectClause, SelectItem -def FakeColumn(name, type_): +def FakeColumn(name, type_, is_primary_key=False): """ Return a Column, the creation of which allows the user not to have to specify the first argument, which is the table to which the column belongs. Typical use should be when creating a FakeTable, use FakeColumns as arguments. """ - return Column(None, name, type_) + col = Column(None, name, type_) + col.is_primary_key = is_primary_key + return col -def FakeTable(name, fake_columns): +def FakeTable(name, fake_columns, storage_format='TEXTFILE'): """ Return a Table consisting of one or more FakeColumns. Because Columns are added via method, we support nesting here instead. @@ -57,6 +59,7 @@ def FakeTable(name, fake_columns): raise Exception('You must supply at least one FakeColumn argument') for fake_column in fake_columns: table.add_col(fake_column) + table.storage_format = storage_format return table http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/822b2ca1/tests/comparison/tests/test_cursor.py ---------------------------------------------------------------------- diff --git a/tests/comparison/tests/test_cursor.py b/tests/comparison/tests/test_cursor.py new file mode 100644 index 0000000..972a7f2 --- /dev/null +++ b/tests/comparison/tests/test_cursor.py @@ -0,0 +1,184 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import pytest + +from tests.comparison.db_connection import ImpalaConnection, PostgresqlConnection +from tests.comparison.db_connection import ImpalaCursor, PostgresqlCursor +from tests.comparison.db_types import Char, Int +from fake_query import FakeColumn, FakeTable + + +# These are fake connection objects that provide some data needed to "unit test" infra +# cursor implementations. +class FakeImpalaConnection(ImpalaConnection): + def _connect(self): + pass + + +class FakePostgresqlConnection(PostgresqlConnection): + def _connect(self): + pass + + [email protected] +def unittest_cursor_map(request): + """ + Given a dictionary parameter to the fixture with the class key/value set as a cursor + class, return a connectionless, "fake" cursor. The cursor is accessible via the + fixture's 'cursor' key in the test methods. + + The fixture parameter dictionary can be annotated with other arbitrary key/values that + describe expectations for that particular cursor and test case. + """ + connection_map = { + ImpalaCursor: FakeImpalaConnection, + PostgresqlCursor: FakePostgresqlConnection + } + cursor_class = request.param['class'] + connection_class = connection_map[cursor_class] + fake_connection_obj = connection_class() + request.param['cursor'] = cursor_class(fake_connection_obj, None) + return request.param + + [email protected]_fixture +def postgresql_cursor(): + """ + Yield a PostgresqlCursor object needed for test infra system tests. + """ + with PostgresqlConnection() as conn: + cursor = conn.cursor() + try: + yield cursor + finally: + cursor.close() + + [email protected]( + 'unittest_cursor_map', [ + {'class': ImpalaCursor, + 'sql': {'two_cols': 'CREATE TABLE two_cols (col1 INT, col2 INT)', + 'one_pk': 'CREATE TABLE one_pk (col1 INT, col2 INT, ' + 'PRIMARY KEY (col1))\n' + 'DISTRIBUTE BY HASH (col1) INTO 3 BUCKETS\n' + 'STORED AS KUDU', + 'three_pks': 'CREATE TABLE three_pks (col1 INT, col2 CHAR(255), ' + 'col3 INT, col4 INT, PRIMARY KEY (col1, col2, col3))\n' + 'DISTRIBUTE BY HASH (col1) INTO 3 BUCKETS\n' + 'STORED AS KUDU'}}, + {'class': PostgresqlCursor, + 'sql': {'two_cols': 'CREATE TABLE two_cols (col1 INTEGER NULL, ' + 'col2 INTEGER NULL)', + 'one_pk': 'CREATE TABLE one_pk (col1 INTEGER, col2 INTEGER NULL, ' + 'PRIMARY KEY (col1))', + 'three_pks': 'CREATE TABLE three_pks (col1 INTEGER, col2 CHAR(255), ' + 'col3 INTEGER, col4 INTEGER NULL, ' + 'PRIMARY KEY (col1, col2, col3))'}}, + ], indirect=True) [email protected]( + 'table_data', [ + {'descr': 'two_cols', + 'data': FakeTable('two_cols', (FakeColumn('col1', Int), + FakeColumn('col2', Int)))}, + {'descr': 'one_pk', + 'data': FakeTable('one_pk', (FakeColumn('col1', Int, is_primary_key=True), + FakeColumn('col2', Int)), + storage_format='KUDU')}, + {'descr': 'three_pks', + 'data': FakeTable('three_pks', (FakeColumn('col1', Int, is_primary_key=True), + FakeColumn('col2', Char, is_primary_key=True), + FakeColumn('col3', Int, is_primary_key=True), + FakeColumn('col4', Int)), + storage_format='KUDU')}, + ]) +def test_make_create_table_sql(unittest_cursor_map, table_data): + """ + Test that for a given Table representation, both the Impala and PostgreSQL cursor + objects generate valid SQL for creating their tables. + """ + which_sql = table_data['descr'] + table_object = table_data['data'] + expected_sql = unittest_cursor_map['sql'][which_sql] + cursor = unittest_cursor_map['cursor'] + assert expected_sql == cursor.make_create_table_sql(table_object) + + [email protected]( + 'unittest_cursor_map', [ + {'class': ImpalaCursor, + 'exceptions_text': { + 'text_with_pk': "table representation has primary keys ('col1',) but is " + 'not in a format that supports them: TEXTFILE', + 'kudu_without_pk': 'table representation has storage format KUDU but does ' + 'not have any primary keys'}} + ], indirect=True) [email protected]( + 'table_data', [ + {'descr': 'text_with_pk', + 'data': FakeTable('two_cols', (FakeColumn('col1', Int, is_primary_key=True), + FakeColumn('col2', Int)))}, + {'descr': 'kudu_without_pk', + 'data': FakeTable('two_cols', (FakeColumn('col1', Int), + FakeColumn('col2', Int)), + storage_format='KUDU')}, + + ]) +def test_invalid_tables(unittest_cursor_map, table_data): + """ + Test that for a given invalid Table representation considering primary keys and + storage formats, the ImpalaCursor properly fails to generate the SQL needed to create + the table. + """ + table_object = table_data['data'] + cursor = unittest_cursor_map['cursor'] + which_test = table_data['descr'] + expected_exception_text = unittest_cursor_map['exceptions_text'][which_test] + # http://doc.pytest.org/en/latest/assert.html#assertions-about-expected-exceptions + with pytest.raises(Exception) as excinfo: + cursor.make_create_table_sql(table_object) + assert str(excinfo.value) == expected_exception_text + + +# TODO: It's not worth it now, but in the future, if we need to interact with postgresql +# with a bunch of infra tests, we should consider some more sophisticated data +# structures and fixtures to handle things in parallel and to reduce code reuse. [email protected]( + 'sql_primary_key_map', [ + {'sql': 'CREATE TABLE mytable (col1 INTEGER NULL, col2 INTEGER NULL)', + 'primary_keys': ()}, + {'sql': ('CREATE TABLE mytable (col1 INTEGER, col2 INTEGER NULL, ' + 'PRIMARY KEY (col1))'), + 'primary_keys': ('col1',)}, + {'sql': ('CREATE TABLE mytable (col1 INTEGER, col2 CHAR(255), col3 INTEGER, ' + 'col4 INTEGER NULL, PRIMARY KEY (col1, col2, col3))'), + 'primary_keys': ('col1', 'col2', 'col3')} + ]) +def test_postgres_table_reading(postgresql_cursor, sql_primary_key_map): + """ + Test that when a postgres table is read by the Postgresql cursor, the Table object + contains the correct primary keys. This tests interacts with the local PostgreSQL + database that's part of the minicluster. + """ + try: + postgresql_cursor.execute('DROP TABLE IF EXISTS mytable') + postgresql_cursor.execute(sql_primary_key_map['sql']) + table = postgresql_cursor.describe_table('mytable') + assert table.primary_key_names == sql_primary_key_map['primary_keys'] + finally: + postgresql_cursor.execute('DROP TABLE mytable')
