IMPALA-4352: test infra: store Impala/Kudu primary keys in object model

Test infrastructure, including the random query generator and the data
migrator, needs to know the primary keys of Impala/Kudu tables. This
test infrastructure keeps Python object models of the tables and
columns. This patch adds the ability to read from source Impala/Kudu
tables via SHOW CREATE TABLE and store primary keys as proper
attributes. The patch also adds tests that ensure the test
infrastructure is always able to read and store the primary keys. This
helps find breakages sooner rather than later. For example, if a
regression to "SHOW CREATE TABLE" or the test infrastructure makes us no
longer able to parse primary keys, GVO or other CI will find the
breakage faster than running the query generator.

I also fixed some flake8 issues in files I touched. There were several
files that had a lot of white space warnings, and I wanted to keep the
patch from getting too large.

Change-Id: Ib654b6cd0e8c2a172ffb7330497be4d4a751e6e5
Reviewed-on: http://gerrit.cloudera.org:8080/4873
Reviewed-by: Michael Brown <[email protected]>
Reviewed-by: Taras Bobrovytsky <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ac516670
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ac516670
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ac516670

Branch: refs/heads/hadoop-next
Commit: ac516670b66b48b54783be6d5b8816ae2273963c
Parents: 50f7753
Author: Michael Brown <[email protected]>
Authored: Wed Oct 26 15:38:38 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Sat Nov 5 19:27:17 2016 +0000

----------------------------------------------------------------------
 tests/comparison/common.py               | 46 +++++++++++++++++--
 tests/comparison/db_connection.py        | 58 +++++++++++++++++++-----
 tests/conftest.py                        | 37 +++++++++++++---
 tests/metadata/test_show_create_table.py | 63 +++++++++++++++++++++++----
 4 files changed, 175 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac516670/tests/comparison/common.py
----------------------------------------------------------------------
diff --git a/tests/comparison/common.py b/tests/comparison/common.py
index 9c70fa4..376243d 100644
--- a/tests/comparison/common.py
+++ b/tests/comparison/common.py
@@ -27,10 +27,13 @@ from copy import deepcopy
 # lazyily imported using the following function. Keep in mind that python 
"globals" are
 # module local, there is no such thing as a cross-module global.
 __ALREADY_IMPORTED = False
+
+
 def get_import(name):
+  # noqa below tells flake8 to not warn when it thinks imports are not used
   global __ALREADY_IMPORTED
   if not __ALREADY_IMPORTED:
-    from db_types import (
+    from db_types import (  # noqa
         BigInt,
         Boolean,
         Char,
@@ -41,13 +44,14 @@ def get_import(name):
         JOINABLE_TYPES,
         Number,
         Timestamp)
-    from funcs import AggFunc, AnalyticFunc, Func
-    from query import InlineView, Subquery, WithClauseInlineView
+    from funcs import AggFunc, AnalyticFunc, Func  # noqa
+    from query import InlineView, Subquery, WithClauseInlineView  # noqa
     for key, value in locals().items():
       globals()[key] = value
     __ALREADY_IMPORTED = True
   return globals()[name]
 
+
 class ValExpr(object):
   '''This is class that represents a generic expr that results in a scalar.'''
 
@@ -259,6 +263,7 @@ class Column(ValExpr):
     self.owner = owner
     self.name = name
     self._exact_type = exact_type
+    self.is_primary_key = False
 
   @property
   def exact_type(self):
@@ -489,7 +494,7 @@ class Table(TableExpr):
 
   def __init__(self, name):
     self.name = name
-    self._cols = [] # can include CollectionColumns and StructColumns
+    self._cols = []  # can include CollectionColumns and StructColumns
     self._unique_cols = []
     self.alias = None
     self.is_visible = True   # Tables used in SEMI or ANTI JOINs are invisible
@@ -512,6 +517,39 @@ class Table(TableExpr):
     return self.alias or self.name
 
   @property
+  def primary_keys(self):
+    """
+    Return immutable sequence of primary keys.
+    """
+    return tuple(col for col in self._cols if col.is_primary_key)
+
+  @property
+  def primary_key_names(self):
+    """
+    Return immutable sequence for primary key names.
+    """
+    return tuple(col.name for col in self.primary_keys)
+
+  @property
+  def updatable_columns(self):
+    """
+    Return immutable sequence of columns that may be updated (i.e., not 
primary keys).
+
+    If the table doesn't have primary keys, no columns are updatable.
+    """
+    if self.primary_keys:
+      return tuple(col for col in self._cols if not col.is_primary_key)
+    else:
+      return ()
+
+  @property
+  def updatable_column_names(self):
+    """
+    Return immutable sequence of column names that may be updated
+    """
+    return tuple(col.name for col in self.updatable_columns)
+
+  @property
   def cols(self):
     result = ValExprList()
     for col in self._cols:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac516670/tests/comparison/db_connection.py
----------------------------------------------------------------------
diff --git a/tests/comparison/db_connection.py 
b/tests/comparison/db_connection.py
index 9076da8..5c51ff3 100644
--- a/tests/comparison/db_connection.py
+++ b/tests/comparison/db_connection.py
@@ -23,6 +23,7 @@
 '''
 import hashlib
 import impala.dbapi
+import re
 import shelve
 from abc import ABCMeta, abstractmethod
 from contextlib import closing
@@ -39,7 +40,6 @@ from pyparsing import (
     nums,
     Suppress,
     Word)
-from re import compile
 from tempfile import gettempdir
 from threading import Lock
 from time import time
@@ -74,6 +74,7 @@ MYSQL = "MYSQL"
 ORACLE = "ORACLE"
 POSTGRESQL = "POSTGRESQL"
 
+
 class DbCursor(object):
   '''Wraps a DB API 2 cursor to provide access to the related conn. This class
      implements the DB API 2 interface by delegation.
@@ -125,10 +126,10 @@ class DbCursor(object):
 
     return tables
 
-  SQL_TYPE_PATTERN = compile(r'([^()]+)(\((\d+,? ?)*\))?')
+  SQL_TYPE_PATTERN = re.compile(r'([^()]+)(\((\d+,? ?)*\))?')
   TYPE_NAME_ALIASES = \
       dict((type_.name().upper(), type_.name().upper()) for type_ in 
EXACT_TYPES)
-  TYPES_BY_NAME =  dict((type_.name().upper(), type_) for type_ in EXACT_TYPES)
+  TYPES_BY_NAME = dict((type_.name().upper(), type_) for type_ in EXACT_TYPES)
   EXACT_TYPES_TO_SQL = dict((type_, type_.name().upper()) for type_ in 
EXACT_TYPES)
 
   @classmethod
@@ -396,11 +397,13 @@ class DbCursor(object):
     raise Exception('unable to parse: {0}, type: {1}'.format(col_name, 
col_type))
 
   def create_table_from_describe(self, table_name, describe_rows):
+    primary_key_names = self._fetch_primary_key_names(table_name)
     table = Table(table_name.lower())
     for row in describe_rows:
       col_name, data_type = row[:2]
       col_type = self.parse_col_desc(data_type)
       col = self.create_column(col_name, col_type)
+      col.is_primary_key = col_name in primary_key_names
       table.add_col(col)
     return table
 
@@ -526,6 +529,16 @@ class DbCursor(object):
         unique_cols.append(cols)
       table.unique_cols = unique_cols
 
+  def _fetch_primary_key_names(self, table_name):
+    """
+    This must return a tuple of strings representing the primary keys of 
table_name,
+    or an empty tuple if there are no primary keys.
+    """
+    # This is the base method. Since we haven't tested this on Oracle or Mysql 
or plan
+    # to implement this for those databases, the base method needs to return 
an empty
+    # tuple.
+    return ()
+
 
 class DbConnection(object):
 
@@ -558,7 +571,7 @@ class DbConnection(object):
         try:
           unlink(link)
         except OSError as e:
-          if not 'No such file' in str(e):
+          if 'No such file' not in str(e):
             raise e
         try:
           symlink(sql_log_path, link)
@@ -641,6 +654,8 @@ class DbConnection(object):
 
 class ImpalaCursor(DbCursor):
 
+  PK_SEARCH_PATTERN = re.compile('PRIMARY KEY \((?P<keys>.*?)\)')
+
   @classmethod
   def make_insert_sql_from_data(cls, table, rows):
     if not rows:
@@ -673,6 +688,31 @@ class ImpalaCursor(DbCursor):
   def cluster(self):
     return self.conn.cluster
 
+  def _fetch_primary_key_names(self, table_name):
+    self.execute("SHOW CREATE TABLE {0}".format(table_name))
+    # This returns 1 column with 1 multiline string row, resembling:
+    #
+    # CREATE TABLE db.table (
+    #   pk1 BIGINT,
+    #   pk2 BIGINT,
+    #   col BIGINT,
+    #   PRIMARY KEY (pk1, pk2)
+    # )
+    #
+    # Even a 1-column primary key will be shown as a PRIMARY KEY constraint, 
like:
+    #
+    # CREATE TABLE db.table (
+    #   pk1 BIGINT,
+    #   col BIGINT,
+    #   PRIMARY KEY (pk1)
+    # )
+    (raw_result,) = self.fetchone()
+    search_result = ImpalaCursor.PK_SEARCH_PATTERN.search(raw_result)
+    if search_result is None:
+      return ()
+    else:
+      return tuple(search_result.group("keys").split(", "))
+
   def invalidate_metadata(self, table_name=None):
     self.execute("INVALIDATE METADATA %s" % (table_name or ""))
 
@@ -958,6 +998,10 @@ class MySQLConnection(DbConnection):
   PORT = 3306
   USER_NAME = "root"
 
+  def __init__(self, client, conn, db_name=None):
+    DbConnection.__init__(self, client, conn, db_name=db_name)
+    self._session_id = self.execute_and_fetchall('SELECT 
connection_id()')[0][0]
+
   def _connect(self):
     try:
       import MySQLdb
@@ -973,12 +1017,6 @@ class MySQLConnection(DbConnection):
         db=self.db_name)
     self._conn.autocommit = True
 
-class MySQLConnection(DbConnection):
-
-  def __init__(self, client, conn, db_name=None):
-    DbConnection.__init__(self, client, conn, db_name=db_name)
-    self._session_id = self.execute_and_fetchall('SELECT 
connection_id()')[0][0]
-
   @property
   def supports_kill_connection(self):
     return True

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac516670/tests/conftest.py
----------------------------------------------------------------------
diff --git a/tests/conftest.py b/tests/conftest.py
index c22de39..c75f117 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -30,6 +30,7 @@ import pytest
 from common import KUDU_MASTER_HOSTS
 from common.test_result_verifier import QueryTestResult
 from tests.common.patterns import is_valid_impala_identifier
+from tests.comparison.db_connection import ImpalaConnection
 from tests.util.filesystem_utils import FILESYSTEM, ISILON_WEBHDFS_PORT
 
 logging.basicConfig(level=logging.INFO, format='%(threadName)s: %(message)s')
@@ -246,9 +247,12 @@ def unique_database(request, testid_checksum):
   num_dbs = 1
   fixture_params = getattr(request, 'param', None)
   if fixture_params is not None:
-    if "name_prefix" in fixture_params: db_name_prefix = 
fixture_params["name_prefix"]
-    if "sync_ddl" in fixture_params: sync_ddl = fixture_params["sync_ddl"]
-    if "num_dbs" in fixture_params: num_dbs = fixture_params["num_dbs"]
+    if "name_prefix" in fixture_params:
+      db_name_prefix = fixture_params["name_prefix"]
+    if "sync_ddl" in fixture_params:
+      sync_ddl = fixture_params["sync_ddl"]
+    if "num_dbs" in fixture_params:
+      num_dbs = fixture_params["num_dbs"]
 
   first_db_name = '{0}_{1}'.format(db_name_prefix, testid_checksum)
   db_names = [first_db_name]
@@ -256,8 +260,8 @@ def unique_database(request, testid_checksum):
     db_names.append(first_db_name + str(i))
   for db_name in db_names:
     if not is_valid_impala_identifier(db_name):
-      raise ValueError('Unique database name "{0}" is not a valid Impala 
identifer; check '
-                       'test function name or any prefixes for long length or 
invalid '
+      raise ValueError('Unique database name "{0}" is not a valid Impala 
identifer; check'
+                       ' test function name or any prefixes for long length or 
invalid '
                        'characters.'.format(db_name))
 
   def cleanup():
@@ -267,8 +271,8 @@ def unique_database(request, testid_checksum):
       request.instance.execute_query_expect_success(
           request.instance.client, 'DROP DATABASE `{0}` 
CASCADE'.format(db_name),
           {'sync_ddl': sync_ddl})
-      LOG.info('Dropped database "{0}" for test ID "{1}"'.format(db_name,
-                                                          
str(request.node.nodeid)))
+      LOG.info('Dropped database "{0}" for test ID "{1}"'.format(
+          db_name, str(request.node.nodeid)))
 
   request.addfinalizer(cleanup)
 
@@ -283,6 +287,7 @@ def unique_database(request, testid_checksum):
                                                                
str(request.node.nodeid)))
   return first_db_name
 
+
 @pytest.yield_fixture
 def kudu_client():
   """Provides a new Kudu client as a pytest fixture. The client only exists 
for the
@@ -401,6 +406,7 @@ def cursor(conn):
   with __auto_closed_cursor(conn) as cur:
     yield cur
 
+
 @pytest.yield_fixture(scope="class")
 def cls_cursor(conn):
   """Provides a new DB-API compliant cursor from a connection provided by the 
conn()
@@ -441,3 +447,20 @@ def __auto_closed_cursor(conn):
       cursor.close()
     except Exception as e:
       LOG.warn("Error closing Impala cursor: %s", e)
+
+
[email protected]_fixture
+def impala_testinfra_cursor():
+  """
+  Return ImpalaCursor object. Used for "tests of tests" for the infra for the 
query
+  generator, stress test, etc.
+  """
+  # This differs from the cursors above, which return direct Impyla cursors. 
Tests that
+  # use this fixture want to interact with the objects in
+  # tests.comparison.db_connection, which need testing.
+  with ImpalaConnection() as conn:
+    cursor = conn.cursor()
+    try:
+      yield cursor
+    finally:
+      cursor.close()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ac516670/tests/metadata/test_show_create_table.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_show_create_table.py 
b/tests/metadata/test_show_create_table.py
index aae9f0c..19841ad 100644
--- a/tests/metadata/test_show_create_table.py
+++ b/tests/metadata/test_show_create_table.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import pprint
+import pytest
 import re
 import shlex
 
@@ -46,8 +47,8 @@ class TestShowCreateTable(ImpalaTestSuite):
     cls.TestMatrix.clear_dimension('exec_option')
     # There is no reason to run these tests using all dimensions.
     
cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload()))
-    cls.TestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format == 'text' and
+    cls.TestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'text' and
         v.get_value('table_format').compression_codec == 'none')
 
   def test_show_create_table(self, vector, unique_database):
@@ -163,7 +164,8 @@ class TestShowCreateTable(ImpalaTestSuite):
 
   def __remove_properties_maps(self, s):
     """ Removes the tblproperties and serdeproperties from the string """
-    return re.sub(self.__properties_map_regex("WITH SERDEPROPERTIES"), "",
+    return re.sub(
+        self.__properties_map_regex("WITH SERDEPROPERTIES"), "",
         re.sub(self.__properties_map_regex("TBLPROPERTIES"), "", s)).strip()
 
   def __get_properties_map(self, s, properties_map_name):
@@ -194,11 +196,11 @@ class ShowCreateTableTestCase(object):
       self.existing_table = True
       self.show_create_table_sql = 
remove_comments(test_section['QUERY']).strip()
     elif 'CREATE_TABLE' in test_section:
-      self.__process_create_section(test_section['CREATE_TABLE'], 
test_file_name,
-          test_db_name, 'table')
+      self.__process_create_section(
+          test_section['CREATE_TABLE'], test_file_name, test_db_name, 'table')
     elif 'CREATE_VIEW' in test_section:
-      self.__process_create_section(test_section['CREATE_VIEW'], 
test_file_name,
-          test_db_name, 'view')
+      self.__process_create_section(
+          test_section['CREATE_VIEW'], test_file_name, test_db_name, 'view')
     else:
       assert 0, 'Error in test file %s. Test cases require a '\
           'CREATE_TABLE section.\n%s' %\
@@ -225,7 +227,7 @@ class ShowCreateTableTestCase(object):
     if len(tokens) < 3 or tokens[0].lower() != "create":
       assert 0, 'Error in test. Invalid CREATE TABLE statement: %s' % 
(create_table_sql)
     if tokens[1].lower() != table_type.lower() and \
-      (tokens[1].lower() != "external" or tokens[2].lower() != 
table_type.lower()):
+       (tokens[1].lower() != "external" or tokens[2].lower() != 
table_type.lower()):
       assert 0, 'Error in test. Invalid CREATE TABLE statement: %s' % 
(create_table_sql)
 
     if tokens[1].lower() == "external":
@@ -234,3 +236,48 @@ class ShowCreateTableTestCase(object):
     else:
       # expect a create table table_name ...
       return tokens[2]
+
+
+class TestInfraCompat(ImpalaTestSuite):
+  """
+  This test suite ensures our test infra (qgen, stress) can always properly 
read the
+  output of "SHOW CREATE TABLE" and find primary keys and updatable columns.
+  """
+
+  TABLE_PRIMARY_KEYS_MAPS = [
+      {'table': 'tpch.customer',
+       'primary_keys': (),
+       'updatable_columns': ()},
+      {'table': 'tpch_kudu.customer',
+       'primary_keys': ('c_custkey',),
+       'updatable_columns': ('c_name', 'c_address', 'c_nationkey', 'c_phone',
+                             'c_acctbal', 'c_mktsegment', 'c_comment')},
+      {'table': 'tpch_kudu.lineitem',
+       'primary_keys': ('l_orderkey', 'l_partkey', 'l_suppkey', 
'l_linenumber'),
+       'updatable_columns': ('l_quantity', 'l_extendedprice', 'l_discount', 
'l_tax',
+                             'l_returnflag', 'l_linestatus', 'l_shipdate', 
'l_commitdate',
+                             'l_receiptdate', 'l_shipinstruct', 'l_shipmode',
+                             'l_comment')}]
+
+  @SkipIf.kudu_not_supported
+  @pytest.mark.parametrize('table_primary_keys_map', TABLE_PRIMARY_KEYS_MAPS)
+  def test_primary_key_parse(self, impala_testinfra_cursor, 
table_primary_keys_map):
+    """
+    Test the query generator's Impala -> Postgres data migrator's ability to 
parse primary
+    keys via SHOW CREATE TABLE. If this test fails, update 
_fetch_primary_key_names, or fix
+    the SHOW CREATE TABLE defect.
+    """
+    assert impala_testinfra_cursor._fetch_primary_key_names(
+        table_primary_keys_map['table']) == 
table_primary_keys_map['primary_keys']
+
+  @SkipIf.kudu_not_supported
+  @pytest.mark.parametrize('table_primary_keys_map', TABLE_PRIMARY_KEYS_MAPS)
+  def test_load_table_with_primary_key_attr(self, impala_testinfra_cursor,
+                                            table_primary_keys_map):
+    """
+    Test that when we load a table for the query generator that the primary 
keys are
+    found and stored in the object model.
+    """
+    table = 
impala_testinfra_cursor.describe_table(table_primary_keys_map['table'])
+    assert table_primary_keys_map['primary_keys'] == table.primary_key_names
+    assert table_primary_keys_map['updatable_columns'] == 
table.updatable_column_names

Reply via email to