http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ab9e54bc/tests/metadata/test_ddl.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index 48f04ab..668b051 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -21,181 +21,171 @@ import time
 
 from test_ddl_base import TestDdlBase
 from tests.common.impala_test_suite import LOG
+from tests.common.parametrize import UniqueDatabase
 from tests.common.skip import SkipIf, SkipIfLocal, SkipIfOldAggsJoins
+from tests.common.test_dimensions import create_single_exec_option_dimension
 from tests.util.filesystem_utils import WAREHOUSE, IS_LOCAL, IS_S3
 
 # Validates DDL statements (create, drop)
 class TestDdlStatements(TestDdlBase):
-  TEST_DBS = ['ddl_test_db', 'ddl_purge_db', 'alter_table_test_db',
-              'alter_table_test_db2', 'function_ddl_test', 'udf_test', 
'data_src_test',
-              'truncate_table_test_db', 'test_db', 'alter_purge_db', 
'db_with_comment']
-
-  def setup_method(self, method):
-    self._cleanup()
-
-  def teardown_method(self, method):
-    self._cleanup()
-
-  def _cleanup(self):
-    map(self.cleanup_db, self.TEST_DBS)
-
   @SkipIfLocal.hdfs_client
-  @pytest.mark.execute_serially
-  def test_drop_table_with_purge(self):
+  def test_drop_table_with_purge(self, unique_database):
     """This test checks if the table data is permamently deleted in
     DROP TABLE <tbl> PURGE queries"""
-    DDL_PURGE_DB = "ddl_purge_db"
-    # Create a sample database ddl_purge_db and tables t1,t2 under it
-    self._create_db(DDL_PURGE_DB)
-    self.client.execute("create table {0}.t1(i int)".format(DDL_PURGE_DB))
-    self.client.execute("create table {0}.t2(i int)".format(DDL_PURGE_DB))
+    self.client.execute("create table {0}.t1(i int)".format(unique_database))
+    self.client.execute("create table {0}.t2(i int)".format(unique_database))
     # Create sample test data files under the table directories
     self.filesystem_client.create_file("test-warehouse/{0}.db/t1/t1.txt".\
-        format(DDL_PURGE_DB), file_data='t1')
+        format(unique_database), file_data='t1')
     self.filesystem_client.create_file("test-warehouse/{0}.db/t2/t2.txt".\
-        format(DDL_PURGE_DB), file_data='t2')
+        format(unique_database), file_data='t2')
     # Drop the table (without purge) and make sure it exists in trash
-    self.client.execute("drop table {0}.t1".format(DDL_PURGE_DB))
+    self.client.execute("drop table {0}.t1".format(unique_database))
     assert not 
self.filesystem_client.exists("test-warehouse/{0}.db/t1/t1.txt".\
-        format(DDL_PURGE_DB))
+        format(unique_database))
     assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/".\
-        format(DDL_PURGE_DB))
+        format(unique_database))
     assert self.filesystem_client.exists(\
         "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/t1.txt".\
-        format(getpass.getuser(), DDL_PURGE_DB))
+        format(getpass.getuser(), unique_database))
     assert self.filesystem_client.exists(\
         "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1".\
-        format(getpass.getuser(), DDL_PURGE_DB))
+        format(getpass.getuser(), unique_database))
     # Drop the table (with purge) and make sure it doesn't exist in trash
-    self.client.execute("drop table {0}.t2 purge".format(DDL_PURGE_DB))
+    self.client.execute("drop table {0}.t2 purge".format(unique_database))
     if not IS_S3:
       # In S3, deletes are eventual. So even though we dropped the table, the 
files
       # belonging to this table will still be visible for some unbounded time. 
This
       # happens only with PURGE. A regular DROP TABLE is just a copy of files 
which is
       # consistent.
       assert not self.filesystem_client.exists("test-warehouse/{0}.db/t2/".\
-          format(DDL_PURGE_DB))
+          format(unique_database))
       assert not 
self.filesystem_client.exists("test-warehouse/{0}.db/t2/t2.txt".\
-          format(DDL_PURGE_DB))
+          format(unique_database))
     assert not self.filesystem_client.exists(\
         "user/{0}/.Trash/Current/test-warehouse/{1}.db/t2/t2.txt".\
-        format(getpass.getuser(), DDL_PURGE_DB))
+        format(getpass.getuser(), unique_database))
     assert not self.filesystem_client.exists(\
         "user/{0}/.Trash/Current/test-warehouse/{1}.db/t2".\
-        format(getpass.getuser(), DDL_PURGE_DB))
+        format(getpass.getuser(), unique_database))
     # Create an external table t3 and run the same test as above. Make
     # sure the data is not deleted
-    self.filesystem_client.make_dir("test-warehouse/data_t3/", permission=777)
-    self.filesystem_client.create_file("test-warehouse/data_t3/data.txt", 
file_data='100')
-    self.client.execute("create external table {0}.t3(i int) stored as \
-      textfile location \'/test-warehouse/data_t3\'" .format(DDL_PURGE_DB))
-    self.client.execute("drop table {0}.t3 purge".format(DDL_PURGE_DB))
-    assert self.filesystem_client.exists("test-warehouse/data_t3/data.txt")
-    self.filesystem_client.delete_file_dir("test-warehouse/data_t3", 
recursive=True)
+    self.filesystem_client.make_dir(
+        "test-warehouse/{0}.db/data_t3/".format(unique_database), 
permission=777)
+    self.filesystem_client.create_file(
+        "test-warehouse/{0}.db/data_t3/data.txt".format(unique_database), 
file_data='100')
+    self.client.execute("create external table {0}.t3(i int) stored as "
+      "textfile location \'/test-warehouse/{0}.db/data_t3\'" 
.format(unique_database))
+    self.client.execute("drop table {0}.t3 purge".format(unique_database))
+    assert self.filesystem_client.exists(
+        "test-warehouse/{0}.db/data_t3/data.txt".format(unique_database))
+    self.filesystem_client.delete_file_dir(
+        "test-warehouse/{0}.db/data_t3".format(unique_database), 
recursive=True)
 
   @SkipIfLocal.hdfs_client
-  @pytest.mark.execute_serially
-  def test_drop_cleans_hdfs_dirs(self):
-    DDL_TEST_DB = "ddl_test_db"
-    self.filesystem_client.delete_file_dir("test-warehouse/ddl_test_db.db/",
-                                           recursive=True)
-    assert not self.filesystem_client.exists("test-warehouse/ddl_test_db.db/")
-
+  def test_drop_cleans_hdfs_dirs(self, unique_database):
     self.client.execute('use default')
-    self._create_db(DDL_TEST_DB)
     # Verify the db directory exists
-    assert 
self.filesystem_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB))
+    assert self.filesystem_client.exists(
+        "test-warehouse/{0}.db/".format(unique_database))
 
-    self.client.execute("create table {0}.t1(i int)".format(DDL_TEST_DB))
+    self.client.execute("create table {0}.t1(i int)".format(unique_database))
     # Verify the table directory exists
-    assert 
self.filesystem_client.exists("test-warehouse/{0}.db/t1/".format(DDL_TEST_DB))
+    assert self.filesystem_client.exists(
+        "test-warehouse/{0}.db/t1/".format(unique_database))
 
     # Dropping the table removes the table's directory and preserves the db's 
directory
-    self.client.execute("drop table {0}.t1".format(DDL_TEST_DB))
+    self.client.execute("drop table {0}.t1".format(unique_database))
     assert not self.filesystem_client.exists(
-        "test-warehouse/{0}.db/t1/".format(DDL_TEST_DB))
-    assert 
self.filesystem_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB))
+        "test-warehouse/{0}.db/t1/".format(unique_database))
+    assert self.filesystem_client.exists(
+        "test-warehouse/{0}.db/".format(unique_database))
 
     # Dropping the db removes the db's directory
-    self.client.execute("drop database {0}".format(DDL_TEST_DB))
-    assert not 
self.filesystem_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB))
+    self.client.execute("drop database {0}".format(unique_database))
+    assert not self.filesystem_client.exists(
+        "test-warehouse/{0}.db/".format(unique_database))
 
     # Dropping the db using "cascade" removes all tables' and db's directories
     # but keeps the external tables' directory
-    self._create_db(DDL_TEST_DB)
-    self.client.execute("create table {0}.t1(i int)".format(DDL_TEST_DB))
-    self.client.execute("create table {0}.t2(i int)".format(DDL_TEST_DB))
+    self._create_db(unique_database)
+    self.client.execute("create table {0}.t1(i int)".format(unique_database))
+    self.client.execute("create table {0}.t2(i int)".format(unique_database))
     result = self.client.execute("create external table {0}.t3(i int) "
-        "location '{1}/{0}/t3/'".format(DDL_TEST_DB, WAREHOUSE))
-    self.client.execute("drop database {0} cascade".format(DDL_TEST_DB))
-    assert not 
self.filesystem_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB))
+        "location '{1}/{0}/t3/'".format(unique_database, WAREHOUSE))
+    self.client.execute("drop database {0} cascade".format(unique_database))
     assert not self.filesystem_client.exists(
-        "test-warehouse/{0}.db/t1/".format(DDL_TEST_DB))
+        "test-warehouse/{0}.db/".format(unique_database))
     assert not self.filesystem_client.exists(
-        "test-warehouse/{0}.db/t2/".format(DDL_TEST_DB))
-    assert 
self.filesystem_client.exists("test-warehouse/{0}/t3/".format(DDL_TEST_DB))
-    
self.filesystem_client.delete_file_dir("test-warehouse/{0}/t3/".format(DDL_TEST_DB),
-        recursive=True)
-    assert not 
self.filesystem_client.exists("test-warehouse/{0}/t3/".format(DDL_TEST_DB))
+        "test-warehouse/{0}.db/t1/".format(unique_database))
+    assert not self.filesystem_client.exists(
+        "test-warehouse/{0}.db/t2/".format(unique_database))
+    assert self.filesystem_client.exists(
+        "test-warehouse/{0}/t3/".format(unique_database))
+    self.filesystem_client.delete_file_dir(
+        "test-warehouse/{0}/t3/".format(unique_database), recursive=True)
+    assert not self.filesystem_client.exists(
+        "test-warehouse/{0}/t3/".format(unique_database))
+    # Re-create database to make unique_database teardown succeed.
+    self._create_db(unique_database)
 
   @SkipIfLocal.hdfs_client
-  @pytest.mark.execute_serially
-  def test_truncate_cleans_hdfs_files(self):
-    TRUNCATE_DB = "truncate_table_test_db"
-    self.filesystem_client.delete_file_dir("test-warehouse/%s.db/" % 
TRUNCATE_DB,
-        recursive=True)
-    assert not self.filesystem_client.exists("test-warehouse/%s.db/" % 
TRUNCATE_DB)
-
-    self._create_db(TRUNCATE_DB, sync=True)
+  def test_truncate_cleans_hdfs_files(self, unique_database):
     # Verify the db directory exists
-    assert self.filesystem_client.exists("test-warehouse/%s.db/" % TRUNCATE_DB)
+    assert self.filesystem_client.exists(
+        "test-warehouse/{0}.db/".format(unique_database))
 
-    self.client.execute("create table %s.t1(i int)" % TRUNCATE_DB)
+    self.client.execute("create table {0}.t1(i int)".format(unique_database))
     # Verify the table directory exists
-    assert 
self.filesystem_client.exists("test-warehouse/truncate_table_test_db.db/t1/")
+    assert self.filesystem_client.exists(
+        "test-warehouse/{0}.db/t1/".format(unique_database))
 
     try:
       # If we're testing S3, we want the staging directory to be created.
       self.client.execute("set s3_skip_insert_staging=false")
       # Should have created one file in the table's dir
-      self.client.execute("insert into %s.t1 values (1)" % TRUNCATE_DB)
-      assert len(self.filesystem_client.ls("test-warehouse/%s.db/t1/" % 
TRUNCATE_DB)) == 2
+      self.client.execute("insert into {0}.t1 values 
(1)".format(unique_database))
+      assert len(self.filesystem_client.ls(
+          "test-warehouse/{0}.db/t1/".format(unique_database))) == 2
 
       # Truncating the table removes the data files and preserves the table's 
directory
-      self.client.execute("truncate table %s.t1" % TRUNCATE_DB)
-      assert len(self.filesystem_client.ls("test-warehouse/%s.db/t1/" % 
TRUNCATE_DB)) == 1
+      self.client.execute("truncate table {0}.t1".format(unique_database))
+      assert len(self.filesystem_client.ls(
+          "test-warehouse/{0}.db/t1/".format(unique_database))) == 1
 
       self.client.execute(
-          "create table %s.t2(i int) partitioned by (p int)" % TRUNCATE_DB)
+          "create table {0}.t2(i int) partitioned by (p 
int)".format(unique_database))
       # Verify the table directory exists
-      assert self.filesystem_client.exists("test-warehouse/%s.db/t2/" % 
TRUNCATE_DB)
+      assert self.filesystem_client.exists(
+          "test-warehouse/{0}.db/t2/".format(unique_database))
 
       # Should have created the partition dir, which should contain exactly 
one file
       self.client.execute(
-          "insert into %s.t2 partition(p=1) values (1)" % TRUNCATE_DB)
+          "insert into {0}.t2 partition(p=1) values 
(1)".format(unique_database))
       assert len(self.filesystem_client.ls(
-          "test-warehouse/%s.db/t2/p=1" % TRUNCATE_DB)) == 1
+          "test-warehouse/{0}.db/t2/p=1".format(unique_database))) == 1
 
       # Truncating the table removes the data files and preserves the 
partition's directory
-      self.client.execute("truncate table %s.t2" % TRUNCATE_DB)
-      assert self.filesystem_client.exists("test-warehouse/%s.db/t2/p=1" % 
TRUNCATE_DB)
+      self.client.execute("truncate table {0}.t2".format(unique_database))
+      assert self.filesystem_client.exists(
+          "test-warehouse/{0}.db/t2/p=1".format(unique_database))
       assert len(self.filesystem_client.ls(
-          "test-warehouse/%s.db/t2/p=1" % TRUNCATE_DB)) == 0
+          "test-warehouse/{0}.db/t2/p=1".format(unique_database))) == 0
     finally:
       # Reset to its default value.
       self.client.execute("set s3_skip_insert_staging=true")
 
-  @pytest.mark.execute_serially
-  def test_truncate_table(self, vector):
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_truncate_table(self, vector, unique_database):
     vector.get_value('exec_option')['abort_on_error'] = False
-    self._create_db('truncate_table_test_db', sync=True)
-    self.run_test_case('QueryTest/truncate-table', vector,
-        use_db='truncate_table_test_db',
+    self.run_test_case('QueryTest/truncate-table', vector, 
use_db=unique_database,
         multiple_impalad=self._use_multiple_impalad(vector))
 
-  @pytest.mark.execute_serially
-  def test_create_database(self, vector):
-    self.run_test_case('QueryTest/create-database', vector,
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_create_database(self, vector, unique_database):
+    # The unique_database provides the .test a unique database name which 
allows
+    # us to run this test in parallel with others.
+    self.run_test_case('QueryTest/create-database', vector, 
use_db=unique_database,
         multiple_impalad=self._use_multiple_impalad(vector))
 
   # There is a query in QueryTest/create-table that references nested types, 
which is not
@@ -203,247 +193,146 @@ class TestDdlStatements(TestDdlBase):
   # additional coverage by running a DDL test under the old aggs and joins, it 
can be
   # skipped.
   @SkipIfOldAggsJoins.nested_types
-  @pytest.mark.execute_serially
-  def test_create_table(self, vector):
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_create_table(self, vector, unique_database):
     vector.get_value('exec_option')['abort_on_error'] = False
-    test_db_name = 'ddl_test_db'
-    self._create_db(test_db_name, sync=True)
-    self.run_test_case('QueryTest/create-table', vector, use_db=test_db_name,
+    self.run_test_case('QueryTest/create-table', vector, 
use_db=unique_database,
         multiple_impalad=self._use_multiple_impalad(vector))
 
-  @pytest.mark.execute_serially
-  def test_create_table_like_table(self, vector):
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_create_table_like_table(self, vector, unique_database):
     vector.get_value('exec_option')['abort_on_error'] = False
-    test_db_name = 'ddl_test_db'
-    self._create_db(test_db_name, sync=True)
-    self.run_test_case('QueryTest/create-table-like-table', vector, 
use_db=test_db_name,
-        multiple_impalad=self._use_multiple_impalad(vector))
+    self.run_test_case('QueryTest/create-table-like-table', vector,
+        use_db=unique_database, 
multiple_impalad=self._use_multiple_impalad(vector))
 
-  @pytest.mark.execute_serially
-  def test_create_table_like_file(self, vector):
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_create_table_like_file(self, vector, unique_database):
     vector.get_value('exec_option')['abort_on_error'] = False
-    test_db_name = 'ddl_test_db'
-    self._create_db(test_db_name, sync=True)
-    self.run_test_case('QueryTest/create-table-like-file', vector, 
use_db=test_db_name,
-        multiple_impalad=self._use_multiple_impalad(vector))
+    self.run_test_case('QueryTest/create-table-like-file', vector,
+        use_db=unique_database, 
multiple_impalad=self._use_multiple_impalad(vector))
 
-  @pytest.mark.execute_serially
-  def test_create_table_as_select(self, vector):
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_create_table_as_select(self, vector, unique_database):
     vector.get_value('exec_option')['abort_on_error'] = False
-    test_db_name = 'ddl_test_db'
-    self._create_db(test_db_name, sync=True)
-    self.run_test_case('QueryTest/create-table-as-select', vector, 
use_db=test_db_name,
-        multiple_impalad=self._use_multiple_impalad(vector))
+    self.run_test_case('QueryTest/create-table-as-select', vector,
+        use_db=unique_database, 
multiple_impalad=self._use_multiple_impalad(vector))
 
   @SkipIf.kudu_not_supported
-  @pytest.mark.execute_serially
-  # TODO: Move this and other Kudu-related DDL tests into a separate py test
-  # under test_ddl_base.py.
-  def test_create_kudu(self, vector):
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_create_kudu(self, vector, unique_database):
     self.expected_exceptions = 2
     vector.get_value('exec_option')['abort_on_error'] = False
-    self._create_db('ddl_test_db', sync=True)
-    self.run_test_case('QueryTest/create_kudu', vector, use_db='ddl_test_db',
+    self.run_test_case('QueryTest/create_kudu', vector, use_db=unique_database,
         multiple_impalad=self._use_multiple_impalad(vector))
 
-  @pytest.mark.execute_serially
-  def test_sync_ddl_drop(self, vector):
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_sync_ddl_drop(self, vector, unique_database):
     """Verifies the catalog gets updated properly when dropping objects with 
sync_ddl
     enabled"""
-    self._create_db('ddl_test_db')
     self.client.set_configuration({'sync_ddl': 1})
     # Drop the database immediately after creation (within a statestore 
heartbeat) and
     # verify the catalog gets updated properly.
-    self.client.execute('drop database ddl_test_db')
-    assert 'ddl_test_db' not in self.all_db_names()
+    self.client.execute("drop database {0}".format(unique_database))
+    assert unique_database not in self.all_db_names()
+    # Re-create database to make unique_database teardown succeed.
+    self._create_db(unique_database)
 
   # TODO: don't use hdfs_client
   @SkipIfLocal.hdfs_client
-  @pytest.mark.execute_serially
-  def test_alter_table(self, vector):
+  @UniqueDatabase.parametrize(sync_ddl=True, num_dbs=2)
+  def test_alter_table(self, vector, unique_database):
     vector.get_value('exec_option')['abort_on_error'] = False
-    self.__test_alter_table_cleanup()
-    # Create directory for partition data that does not use the (key=value)
-    # format.
-    self.filesystem_client.make_dir("test-warehouse/part_data/", 
permission=777)
-    self.filesystem_client.create_file(
-        "test-warehouse/part_data/data.txt", file_data='1984')
-
-    try:
-      # Create test databases
-      self._create_db('alter_table_test_db', sync=True)
-      self._create_db('alter_table_test_db2', sync=True)
-      self.run_test_case('QueryTest/alter-table', vector, 
use_db='alter_table_test_db',
-          multiple_impalad=self._use_multiple_impalad(vector))
-    finally:
-      self.__test_alter_table_cleanup()
-
-  def __test_alter_table_cleanup(self):
-    if IS_LOCAL: return
-    # Cleanup the test table HDFS dirs between test runs so there are no 
errors the next
-    # time a table is created with the same location. This also helps remove 
any stale
-    # data from the last test run.
-    for dir_ in ['part_data', 't1_tmp1', 't_part_tmp']:
-      self.filesystem_client.delete_file_dir('test-warehouse/%s' % dir_, 
recursive=True)
 
-  @pytest.mark.execute_serially
-  def test_alter_set_column_stats(self, vector):
-    self._create_db('alter_table_test_db', sync=True)
-    self.run_test_case('QueryTest/alter-table-set-column-stats',
-        vector, use_db='alter_table_test_db',
+    # Create an unpartitioned table to get a filesystem directory that does not
+    # use the (key=value) format. The directory is automatically cleanup up
+    # by the unique_database fixture.
+    self.client.execute("create table {0}.part_data (i 
int)".format(unique_database))
+    assert self.filesystem_client.exists(
+        "test-warehouse/{0}.db/part_data".format(unique_database))
+    self.filesystem_client.create_file(
+        "test-warehouse/{0}.db/part_data/data.txt".format(unique_database),
+        file_data='1984')
+    self.run_test_case('QueryTest/alter-table', vector, use_db=unique_database,
         multiple_impalad=self._use_multiple_impalad(vector))
 
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_alter_set_column_stats(self, vector, unique_database):
+    self.run_test_case('QueryTest/alter-table-set-column-stats', vector,
+        use_db=unique_database, 
multiple_impalad=self._use_multiple_impalad(vector))
+
   @SkipIfLocal.hdfs_client
-  @pytest.mark.execute_serially
-  def test_drop_partition_with_purge(self, vector):
+  def test_drop_partition_with_purge(self, vector, unique_database):
     """Verfies whether alter <tbl> drop partition purge actually skips trash"""
-    ALTER_PURGE_DB = "alter_purge_db"
-    # Create a sample database alter_purge_db and table t1 in it
-    self._create_db(ALTER_PURGE_DB)
-    self.client.execute("create table {0}.t1(i int) partitioned\
-        by (j int)".format(ALTER_PURGE_DB))
+    self.client.execute(
+        "create table {0}.t1(i int) partitioned by (j 
int)".format(unique_database))
     # Add two partitions (j=1) and (j=2) to table t1
-    self.client.execute("alter table {0}.t1 add 
partition(j=1)".format(ALTER_PURGE_DB))
-    self.client.execute("alter table {0}.t1 add 
partition(j=2)".format(ALTER_PURGE_DB))
+    self.client.execute("alter table {0}.t1 add 
partition(j=1)".format(unique_database))
+    self.client.execute("alter table {0}.t1 add 
partition(j=2)".format(unique_database))
     self.filesystem_client.create_file(\
-        "test-warehouse/{0}.db/t1/j=1/j1.txt".format(ALTER_PURGE_DB), 
file_data='j1')
+        "test-warehouse/{0}.db/t1/j=1/j1.txt".format(unique_database), 
file_data='j1')
     self.filesystem_client.create_file(\
-        "test-warehouse/{0}.db/t1/j=2/j2.txt".format(ALTER_PURGE_DB), 
file_data='j2')
+        "test-warehouse/{0}.db/t1/j=2/j2.txt".format(unique_database), 
file_data='j2')
     # Drop the partition (j=1) without purge and make sure it exists in trash
-    self.client.execute("alter table {0}.t1 drop 
partition(j=1)".format(ALTER_PURGE_DB));
+    self.client.execute("alter table {0}.t1 drop 
partition(j=1)".format(unique_database))
     assert not 
self.filesystem_client.exists("test-warehouse/{0}.db/t1/j=1/j1.txt".\
-        format(ALTER_PURGE_DB))
+        format(unique_database))
     assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/j=1".\
-        format(ALTER_PURGE_DB))
+        format(unique_database))
     assert self.filesystem_client.exists(\
         "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1/j1.txt".\
-        format(getpass.getuser(), ALTER_PURGE_DB))
+        format(getpass.getuser(), unique_database))
     assert self.filesystem_client.exists(\
         "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1".\
-        format(getpass.getuser(), ALTER_PURGE_DB))
+        format(getpass.getuser(), unique_database))
     # Drop the partition (with purge) and make sure it doesn't exist in trash
     self.client.execute("alter table {0}.t1 drop partition(j=2) purge".\
-        format(ALTER_PURGE_DB));
+        format(unique_database));
     if not IS_S3:
       # In S3, deletes are eventual. So even though we dropped the partition, 
the files
       # belonging to this partition will still be visible for some unbounded 
time. This
       # happens only with PURGE. A regular DROP TABLE is just a copy of files 
which is
       # consistent.
       assert not 
self.filesystem_client.exists("test-warehouse/{0}.db/t1/j=2/j2.txt".\
-          format(ALTER_PURGE_DB))
+          format(unique_database))
       assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/j=2".\
-          format(ALTER_PURGE_DB))
+          format(unique_database))
     assert not self.filesystem_client.exists(\
         "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2".\
-        format(getpass.getuser(), ALTER_PURGE_DB))
+        format(getpass.getuser(), unique_database))
     assert not self.filesystem_client.exists(
         "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2/j2.txt".\
-        format(getpass.getuser(), ALTER_PURGE_DB))
+        format(getpass.getuser(), unique_database))
 
-  @pytest.mark.execute_serially
-  def test_views_ddl(self, vector):
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_views_ddl(self, vector, unique_database):
     vector.get_value('exec_option')['abort_on_error'] = False
-    self._create_db('ddl_test_db', sync=True)
-    self.run_test_case('QueryTest/views-ddl', vector, use_db='ddl_test_db',
+    self.run_test_case('QueryTest/views-ddl', vector, use_db=unique_database,
         multiple_impalad=self._use_multiple_impalad(vector))
 
-  @pytest.mark.execute_serially
-  def test_functions_ddl(self, vector):
-    self._create_db('function_ddl_test', sync=True)
-    self.run_test_case('QueryTest/functions-ddl', vector, 
use_db='function_ddl_test',
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_functions_ddl(self, vector, unique_database):
+    self.run_test_case('QueryTest/functions-ddl', vector, 
use_db=unique_database,
         multiple_impalad=self._use_multiple_impalad(vector))
 
-  @pytest.mark.execute_serially
-  def test_create_drop_function(self, vector):
-    """This will create, run, and drop the same function repeatedly, 
exercising the
-    lib cache mechanism.
-    """
-    create_fn_stmt = ("create function f() returns int "
-                      "location '%s/libTestUdfs.so' symbol='NoArgs'" % 
WAREHOUSE)
-    select_stmt = "select f() from functional.alltypes limit 10"
-    drop_fn_stmt = "drop function %s f()"
-    self.create_drop_ddl(vector, "udf_test", [create_fn_stmt], [drop_fn_stmt],
-        select_stmt)
-
-  @pytest.mark.execute_serially
-  def test_create_drop_data_src(self, vector):
-    """This will create, run, and drop the same data source repeatedly, 
exercising
-    the lib cache mechanism.
-    """
-    create_ds_stmt = ("CREATE DATA SOURCE test_data_src "
-        "LOCATION '%s/data-sources/test-data-source.jar' "
-        "CLASS 'com.cloudera.impala.extdatasource.AllTypesDataSource' "
-        "API_VERSION 'V1'" % WAREHOUSE)
-    create_tbl_stmt = """CREATE TABLE data_src_tbl (x int)
-        PRODUCED BY DATA SOURCE test_data_src('dummy_init_string')"""
-    drop_ds_stmt = "drop data source %s test_data_src"
-    drop_tbl_stmt = "drop table %s data_src_tbl"
-    select_stmt = "select * from data_src_tbl limit 1"
-    class_cache_hits_metric = "external-data-source.class-cache.hits"
-    class_cache_misses_metric = "external-data-source.class-cache.misses"
-
-    create_stmts = [create_ds_stmt, create_tbl_stmt]
-    drop_stmts = [drop_tbl_stmt, drop_ds_stmt]
-
-    # The ImpaladService is used to capture metrics
-    service = self.impalad_test_service
-
-    # Initial metric values
-    class_cache_hits = service.get_metric_value(class_cache_hits_metric)
-    class_cache_misses = service.get_metric_value(class_cache_misses_metric)
-    # Test with 1 node so we can check the metrics on only the coordinator
-    vector.get_value('exec_option')['num_nodes'] = 1
-    num_iterations = 2
-    self.create_drop_ddl(vector, "data_src_test", create_stmts, drop_stmts,
-        select_stmt, num_iterations)
-
-    # Check class cache metrics. Shouldn't have any new cache hits, there 
should be
-    # 2 cache misses for every iteration (jar is loaded by both the FE and BE).
-    expected_cache_misses = class_cache_misses + (num_iterations * 2)
-    service.wait_for_metric_value(class_cache_hits_metric, class_cache_hits)
-    service.wait_for_metric_value(class_cache_misses_metric,
-        expected_cache_misses)
-
-    # Test with a table that caches the class
-    create_tbl_stmt = """CREATE TABLE data_src_tbl (x int)
-        PRODUCED BY DATA SOURCE 
test_data_src('CACHE_CLASS::dummy_init_string')"""
-    create_stmts = [create_ds_stmt, create_tbl_stmt]
-    # Run once before capturing metrics because the class already may be 
cached from
-    # a previous test run.
-    # TODO: Provide a way to clear the cache
-    self.create_drop_ddl(vector, "data_src_test", create_stmts, drop_stmts,
-        select_stmt, 1)
-
-    # Capture metric values and run again, should hit the cache.
-    class_cache_hits = service.get_metric_value(class_cache_hits_metric)
-    class_cache_misses = service.get_metric_value(class_cache_misses_metric)
-    self.create_drop_ddl(vector, "data_src_test", create_stmts, drop_stmts,
-        select_stmt, 1)
-    service.wait_for_metric_value(class_cache_hits_metric, class_cache_hits + 
2)
-    service.wait_for_metric_value(class_cache_misses_metric, 
class_cache_misses)
-
   @SkipIfLocal.hdfs_client
-  @pytest.mark.execute_serially
-  def test_create_alter_bulk_partition(self, vector):
-    TBL_NAME = 'foo_part'
+  def test_create_alter_bulk_partition(self, vector, unique_database):
     # Change the scale depending on the exploration strategy, with 50 
partitions this
-    # takes a few minutes to run, with 10 partitions it takes ~50s for two 
configurations.
+    # test runs a few minutes, with 10 partitions it takes ~50s for two 
configurations.
     num_parts = 50 if self.exploration_strategy() == 'exhaustive' else 10
-    self.client.execute("use default")
-    self.client.execute("drop table if exists {0}".format(TBL_NAME))
-    self.client.execute("""create table {0}(i int) partitioned by(j int, s 
string)
-         location '{1}/{0}'""".format(TBL_NAME, WAREHOUSE))
+    fq_tbl_name = unique_database + ".part_test_tbl"
+    self.client.execute("create table {0}(i int) partitioned by(j int, s 
string) "
+         "location '{1}/{0}'".format(fq_tbl_name, WAREHOUSE))
 
     # Add some partitions (first batch of two)
     for i in xrange(num_parts / 5):
       start = time.time()
-      self.client.execute("alter table {0} add partition(j={1}, 
s='{1}')".format(TBL_NAME,
-                                                                               
  i))
+      self.client.execute(
+          "alter table {0} add partition(j={1}, s='{1}')".format(fq_tbl_name, 
i))
       LOG.info('ADD PARTITION #%d exec time: %s' % (i, time.time() - start))
 
     # Modify one of the partitions
-    self.client.execute("alter table %s partition(j=1, s='1')"
-        " set fileformat parquetfile" % TBL_NAME)
+    self.client.execute("alter table {0} partition(j=1, s='1')"
+        " set fileformat parquetfile".format(fq_tbl_name))
 
     # Alter one partition to a non-existent location twice (IMPALA-741)
     self.filesystem_client.delete_file_dir("tmp/dont_exist1/", recursive=True)
@@ -451,34 +340,31 @@ class TestDdlStatements(TestDdlBase):
 
     self.execute_query_expect_success(self.client,
         "alter table {0} partition(j=1,s='1') set location 
'{1}/tmp/dont_exist1'"
-        .format(TBL_NAME, WAREHOUSE))
+        .format(fq_tbl_name, WAREHOUSE))
     self.execute_query_expect_success(self.client,
         "alter table {0} partition(j=1,s='1') set location 
'{1}/tmp/dont_exist2'"
-        .format(TBL_NAME, WAREHOUSE))
+        .format(fq_tbl_name, WAREHOUSE))
 
     # Add some more partitions
     for i in xrange(num_parts / 5, num_parts):
       start = time.time()
-      self.client.execute("alter table {0} add 
partition(j={1},s='{1}')".format(TBL_NAME,
-                                                                               
 i))
+      self.client.execute(
+          "alter table {0} add partition(j={1},s='{1}')".format(fq_tbl_name, 
i))
       LOG.info('ADD PARTITION #%d exec time: %s' % (i, time.time() - start))
 
     # Insert data and verify it shows up.
-    self.client.execute("insert into table {0} partition(j=1, s='1') select 1"
-      .format(TBL_NAME))
-    assert '1' == self.execute_scalar("select count(*) from 
{0}".format(TBL_NAME))
-    self.client.execute("drop table {0}".format(TBL_NAME))
+    self.client.execute(
+        "insert into table {0} partition(j=1, s='1') select 
1".format(fq_tbl_name))
+    assert '1' == self.execute_scalar("select count(*) from 
{0}".format(fq_tbl_name))
 
-  @pytest.mark.execute_serially
-  def test_create_alter_tbl_properties(self, vector):
-    self._create_db('alter_table_test_db', sync=True)
-    self.client.execute("use alter_table_test_db")
+  def test_create_alter_tbl_properties(self, vector, unique_database):
+    fq_tbl_name = unique_database + ".test_alter_tbl"
 
     # Specify TBLPROPERTIES and SERDEPROPERTIES at CREATE time
-    self.client.execute("""create table test_alter_tbl (i int)
+    self.client.execute("""create table {0} (i int)
     with serdeproperties ('s1'='s2', 's3'='s4')
-    tblproperties ('p1'='v0', 'p1'='v1')""")
-    properties = self._get_tbl_properties('test_alter_tbl')
+    tblproperties ('p1'='v0', 'p1'='v1')""".format(fq_tbl_name))
+    properties = self._get_tbl_properties(fq_tbl_name)
 
     assert len(properties) == 2
     # The transient_lastDdlTime is variable, so don't verify the value.
@@ -486,22 +372,124 @@ class TestDdlStatements(TestDdlBase):
     del properties['transient_lastDdlTime']
     assert {'p1': 'v1'} == properties
 
-    properties = self._get_serde_properties('test_alter_tbl')
+    properties = self._get_serde_properties(fq_tbl_name)
     assert {'s1': 's2', 's3': 's4'} == properties
 
     # Modify the SERDEPROPERTIES using ALTER TABLE SET.
-    self.client.execute("alter table test_alter_tbl set serdeproperties "\
-        "('s1'='new', 's5'='s6')")
-    properties = self._get_serde_properties('test_alter_tbl')
+    self.client.execute("alter table {0} set serdeproperties "
+        "('s1'='new', 's5'='s6')".format(fq_tbl_name))
+    properties = self._get_serde_properties(fq_tbl_name)
     assert {'s1': 'new', 's3': 's4', 's5': 's6'} == properties
 
     # Modify the TBLPROPERTIES using ALTER TABLE SET.
-    self.client.execute("alter table test_alter_tbl set tblproperties "\
-        "('prop1'='val1', 'p2'='val2', 'p2'='val3', ''='')")
-    properties = self._get_tbl_properties('test_alter_tbl')
+    self.client.execute("alter table {0} set tblproperties "
+        "('prop1'='val1', 'p2'='val2', 'p2'='val3', 
''='')".format(fq_tbl_name))
+    properties = self._get_tbl_properties(fq_tbl_name)
 
     assert 'transient_lastDdlTime' in properties
     assert properties['p1'] == 'v1'
     assert properties['prop1'] == 'val1'
     assert properties['p2'] == 'val3'
     assert properties[''] == ''
+
+
+# IMPALA-2002: Tests repeated adding/dropping of .jar and .so in the lib cache.
+class TestLibCache(TestDdlBase):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestLibCache, cls).add_test_dimensions()
+    cls.TestMatrix.add_dimension(create_single_exec_option_dimension())
+
+  def test_create_drop_function(self, vector, unique_database):
+    """This will create, run, and drop the same function repeatedly, 
exercising the
+    lib cache mechanism.
+    """
+    create_fn_stmt = ("create function {0}.f() returns int "
+                      "location '{1}/libTestUdfs.so' symbol='NoArgs'"
+                      .format(unique_database, WAREHOUSE))
+    select_stmt = ("select {0}.f() from functional.alltypes limit 10"
+                   .format(unique_database))
+    drop_fn_stmt = "drop function %s {0}.f()".format(unique_database)
+    self.create_drop_ddl(vector, [create_fn_stmt], [drop_fn_stmt], select_stmt)
+
+  # Run serially because this test inspects global impalad metrics.
+  # TODO: The metrics checks could be relaxed to enable running this test in
+  # parallel, but that might need a more general wait_for_metric_value().
+  @pytest.mark.execute_serially
+  def test_create_drop_data_src(self, vector, unique_database):
+    """This will create, run, and drop the same data source repeatedly, 
exercising
+    the lib cache mechanism.
+    """
+    data_src_name = unique_database + "_datasrc"
+    create_ds_stmt = ("CREATE DATA SOURCE {0} "
+        "LOCATION '{1}/data-sources/test-data-source.jar' "
+        "CLASS 'com.cloudera.impala.extdatasource.AllTypesDataSource' "
+        "API_VERSION 'V1'".format(data_src_name, WAREHOUSE))
+    create_tbl_stmt = ("CREATE TABLE {0}.data_src_tbl (x int) "
+        "PRODUCED BY DATA SOURCE {1}('dummy_init_string')")\
+        .format(unique_database, data_src_name)
+    drop_ds_stmt = "drop data source %s {0}".format(data_src_name)
+    drop_tbl_stmt = "drop table %s {0}.data_src_tbl".format(unique_database)
+    select_stmt = "select * from {0}.data_src_tbl limit 
1".format(unique_database)
+    class_cache_hits_metric = "external-data-source.class-cache.hits"
+    class_cache_misses_metric = "external-data-source.class-cache.misses"
+
+    create_stmts = [create_ds_stmt, create_tbl_stmt]
+    drop_stmts = [drop_tbl_stmt, drop_ds_stmt]
+
+    # The ImpaladService is used to capture metrics
+    service = self.impalad_test_service
+
+    # Initial metric values
+    class_cache_hits = service.get_metric_value(class_cache_hits_metric)
+    class_cache_misses = service.get_metric_value(class_cache_misses_metric)
+    # Test with 1 node so we can check the metrics on only the coordinator
+    vector.get_value('exec_option')['num_nodes'] = 1
+    num_iterations = 2
+    self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, 
num_iterations)
+
+    # Check class cache metrics. Shouldn't have any new cache hits, there 
should be
+    # 2 cache misses for every iteration (jar is loaded by both the FE and BE).
+    expected_cache_misses = class_cache_misses + (num_iterations * 2)
+    service.wait_for_metric_value(class_cache_hits_metric, class_cache_hits)
+    service.wait_for_metric_value(class_cache_misses_metric,
+        expected_cache_misses)
+
+    # Test with a table that caches the class
+    create_tbl_stmt = ("CREATE TABLE {0}.data_src_tbl (x int) "
+        "PRODUCED BY DATA SOURCE {1}('CACHE_CLASS::dummy_init_string')")\
+        .format(unique_database, data_src_name)
+    create_stmts = [create_ds_stmt, create_tbl_stmt]
+    # Run once before capturing metrics because the class already may be 
cached from
+    # a previous test run.
+    # TODO: Provide a way to clear the cache
+    self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, 1)
+
+    # Capture metric values and run again, should hit the cache.
+    class_cache_hits = service.get_metric_value(class_cache_hits_metric)
+    class_cache_misses = service.get_metric_value(class_cache_misses_metric)
+    self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, 1)
+    service.wait_for_metric_value(class_cache_hits_metric, class_cache_hits + 
2)
+    service.wait_for_metric_value(class_cache_misses_metric, 
class_cache_misses)
+
+  def create_drop_ddl(self, vector, create_stmts, drop_stmts, select_stmt,
+      num_iterations=3):
+    """Helper method to run CREATE/DROP DDL commands repeatedly and exercise 
the lib
+    cache. create_stmts is the list of CREATE statements to be executed in 
order
+    drop_stmts is the list of DROP statements to be executed in order. Each 
statement
+    should have a '%s' placeholder to insert "IF EXISTS" or "". The 
select_stmt is just a
+    single statement to test after executing the CREATE statements.
+    TODO: it's hard to tell that the cache is working (i.e. if it did nothing 
to drop
+    the cache, these tests would still pass). Testing that is a bit harder and 
requires
+    us to update the udf binary in the middle.
+    """
+    self.client.set_configuration(vector.get_value("exec_option"))
+    for drop_stmt in drop_stmts: self.client.execute(drop_stmt % ("if exists"))
+    for i in xrange(0, num_iterations):
+      for create_stmt in create_stmts: self.client.execute(create_stmt)
+      self.client.execute(select_stmt)
+      for drop_stmt in drop_stmts: self.client.execute(drop_stmt % (""))

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ab9e54bc/tests/metadata/test_ddl_base.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_ddl_base.py b/tests/metadata/test_ddl_base.py
index 300a270..b20d897 100644
--- a/tests/metadata/test_ddl_base.py
+++ b/tests/metadata/test_ddl_base.py
@@ -46,28 +46,6 @@ class TestDdlBase(ImpalaTestSuite):
     # There is no reason to run these tests using all dimensions.
     
cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload()))
 
-  def create_drop_ddl(self, vector, db_name, create_stmts, drop_stmts, 
select_stmt,
-      num_iterations=3):
-    """Helper method to run CREATE/DROP DDL commands repeatedly and exercise 
the lib
-    cache. create_stmts is the list of CREATE statements to be executed in 
order
-    drop_stmts is the list of DROP statements to be executed in order. Each 
statement
-    should have a '%s' placeholder to insert "IF EXISTS" or "". The 
select_stmt is just a
-    single statement to test after executing the CREATE statements.
-    TODO: it's hard to tell that the cache is working (i.e. if it did nothing 
to drop
-    the cache, these tests would still pass). Testing that is a bit harder and 
requires
-    us to update the udf binary in the middle.
-    """
-    # The db may already exist, clean it up.
-    self.cleanup_db(db_name)
-    self._create_db(db_name, sync=True)
-    self.client.set_configuration(vector.get_value('exec_option'))
-    self.client.execute("use %s" % (db_name,))
-    for drop_stmt in drop_stmts: self.client.execute(drop_stmt % ("if exists"))
-    for i in xrange(0, num_iterations):
-      for create_stmt in create_stmts: self.client.execute(create_stmt)
-      self.client.execute(select_stmt)
-      for drop_stmt in drop_stmts: self.client.execute(drop_stmt % (""))
-
   @classmethod
   def _use_multiple_impalad(cls, vector):
     return vector.get_value('exec_option')['sync_ddl'] == 1

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ab9e54bc/tests/metadata/test_hms_integration.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_hms_integration.py 
b/tests/metadata/test_hms_integration.py
index 90d47d9..23254ac 100644
--- a/tests/metadata/test_hms_integration.py
+++ b/tests/metadata/test_hms_integration.py
@@ -35,6 +35,11 @@ from tests.common.test_dimensions import (
     create_single_exec_option_dimension,
     create_uncompressed_text_dimension)
 
+import logging
+
+logging.basicConfig(level=logging.INFO, format='%(threadName)s: %(message)s')
+LOG = logging.getLogger('test_configuration')
+
 @SkipIfS3.hive
 @SkipIfIsilon.hive
 @SkipIfLocal.hive
@@ -56,31 +61,30 @@ class TestHmsIntegrationSanity(ImpalaTestSuite):
     """Verifies that creating a catalog entity (database, table) in Impala 
using
     'IF NOT EXISTS' while the entity exists in HMS, does not throw an error."""
     # Create a database in Hive
-    ret = call(["hive", "-e", "create database test_db"])
-    assert ret == 0
+    self.run_stmt_in_hive("drop database if exists hms_sanity_db cascade")
+    self.run_stmt_in_hive("create database hms_sanity_db")
+    # Make sure Impala's metadata is in sync.
+    self.client.execute("invalidate metadata")
     # Creating a database with the same name using 'IF NOT EXISTS' in Impala 
should
     # not fail
-    self.client.execute("create database if not exists test_db")
+    self.client.execute("create database if not exists hms_sanity_db")
     # The database should appear in the catalog (IMPALA-2441)
-    assert 'test_db' in self.all_db_names()
+    assert 'hms_sanity_db' in self.all_db_names()
     # Ensure a table can be created in this database from Impala and that it is
     # accessable in both Impala and Hive
-    self.client.execute("create table if not exists 
test_db.test_tbl_in_impala(a int)")
-    ret = call(["hive", "-e", "select * from test_db.test_tbl_in_impala"])
-    assert ret == 0
-    self.client.execute("select * from test_db.test_tbl_in_impala")
-
+    self.client.execute("create table hms_sanity_db.test_tbl_in_impala(a int)")
+    self.run_stmt_in_hive("select * from hms_sanity_db.test_tbl_in_impala")
+    self.client.execute("select * from hms_sanity_db.test_tbl_in_impala")
     # Create a table in Hive
-    ret = call(["hive", "-e", "create table test_db.test_tbl (a int)"])
-    assert ret == 0
+    self.run_stmt_in_hive("create table hms_sanity_db.test_tbl (a int)")
     # Creating a table with the same name using 'IF NOT EXISTS' in Impala 
should
     # not fail
-    self.client.execute("create table if not exists test_db.test_tbl (a int)")
+    self.client.execute("create table if not exists hms_sanity_db.test_tbl (a 
int)")
     # The table should not appear in the catalog unless invalidate metadata is
     # executed
-    assert 'test_tbl' not in self.client.execute("show tables in test_db").data
-    self.client.execute("invalidate metadata test_db.test_tbl")
-    assert 'test_tbl' in self.client.execute("show tables in test_db").data
+    assert 'test_tbl' not in self.client.execute("show tables in 
hms_sanity_db").data
+    self.client.execute("invalidate metadata hms_sanity_db.test_tbl")
+    assert 'test_tbl' in self.client.execute("show tables in 
hms_sanity_db").data
 
 @SkipIfS3.hive
 @SkipIfIsilon.hive


Reply via email to