IMPALA-3491: Use unique_database fixture in test_stale_metadata.py Testing: I ran the test 10 times in a loop locally and ran a private core/hdfs run.
Change-Id: Ibd058853e6b48671838e5b51611b6c34a7a8d39d Reviewed-on: http://gerrit.cloudera.org:8080/2982 Reviewed-by: Michael Brown <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/96e18f9e Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/96e18f9e Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/96e18f9e Branch: refs/heads/master Commit: 96e18f9e62f508a1a38aaf725a99a611de7c51be Parents: 07bdb6d Author: Alex Behm <[email protected]> Authored: Thu May 5 10:38:02 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Thu May 12 14:17:59 2016 -0700 ---------------------------------------------------------------------- tests/metadata/test_stale_metadata.py | 96 ++++++++++++------------------ 1 file changed, 39 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/96e18f9e/tests/metadata/test_stale_metadata.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_stale_metadata.py b/tests/metadata/test_stale_metadata.py index b2bbd22..f3c6db7 100644 --- a/tests/metadata/test_stale_metadata.py +++ b/tests/metadata/test_stale_metadata.py @@ -12,12 +12,6 @@ class TestRewrittenFile(ImpalaTestSuite): """Tests that we gracefully handle when a file in HDFS is rewritten outside of Impala without issuing "invalidate metadata".""" - # Create a unique database name so we can run multiple instances of this test class in - # parallel - DATABASE = "test_written_file_" + str(random.randint(0, 10**10)) - - TABLE_NAME = "alltypes_rewritten_file" - TABLE_LOCATION = get_fs_path("/test-warehouse/%s" % DATABASE) FILE_NAME = "alltypes.parq" # file size = 17.8 KB SHORT_FILE = get_fs_path("/test-warehouse/alltypesagg_parquet/year=2010/month=1/" \ @@ -40,101 +34,89 @@ class TestRewrittenFile(ImpalaTestSuite): cls.TestMatrix.add_constraint( lambda v: v.get_value('table_format').file_format == 'parquet') - @classmethod - def setup_class(cls): - super(TestRewrittenFile, cls).setup_class() - cls.cleanup_db(cls.DATABASE) - cls.client.execute("create database if not exists " + cls.DATABASE) - - @classmethod - def teardown_class(cls): - cls.cleanup_db(cls.DATABASE) - super(TestRewrittenFile, cls).teardown_class() - - def teardown_method(self, method): - self.__drop_test_table() - - def __overwrite_file_and_query(self, vector, old_file, new_file, expected_error, - expected_new_count): + def __overwrite_file_and_query(self, db_name, table_name, old_file, new_file, + expected_error, expected_new_count): """Rewrites 'old_file' with 'new_file' without invalidating metadata and verifies that querying the table results in the expected error. 'expected_error' only needs to be a substring of the full error message.""" - self.__create_test_table() + table_location = self.__get_test_table_location(db_name) + self.__create_test_table(db_name, table_name, table_location) # First copy in 'old_file' and refresh the cached file metadata. - self.__copy_file_to_test_table(old_file) - self.client.execute("refresh %s" % self.__full_table_name()) + self.__copy_file_to_test_table(old_file, table_location) + self.client.execute("refresh %s.%s" % (db_name, table_name)) # Then overwrite 'old_file' with 'new_file', and don't invalidate metadata. - self.__copy_file_to_test_table(new_file) + self.__copy_file_to_test_table(new_file, table_location) # Query the table and check for expected error. try: - result = self.client.execute("select * from %s" % self.__full_table_name()) + result = self.client.execute("select * from %s.%s" % (db_name, table_name)) assert False, "Query was expected to fail" except ImpalaBeeswaxException as e: assert expected_error in str(e) # Refresh the table and make sure we get results - self.client.execute("refresh %s" % self.__full_table_name()) - result = self.client.execute("select count(*) from %s" % self.__full_table_name()) + self.client.execute("refresh %s.%s" % (db_name, table_name)) + result = self.client.execute("select count(*) from %s.%s" % (db_name, table_name)) assert result.data == [str(expected_new_count)] @SkipIfS3.jira(reason="IMPALA-2512") - def test_new_file_shorter(self, vector): + def test_new_file_shorter(self, vector, unique_database): """Rewrites an existing file with a new shorter file.""" # Full error is something like: # Metadata for file '...' appears stale. Try running "refresh - # test_written_file_xxx.alltypes_rewritten_file" to reload the file metadata. - self.__overwrite_file_and_query(vector, self.LONG_FILE, self.SHORT_FILE, - 'appears stale.', self.SHORT_FILE_NUM_ROWS) + # unique_database_name.new_file_shorter" to reload the file metadata. + table_name = "new_file_shorter" + self.__overwrite_file_and_query(unique_database, table_name, + self.LONG_FILE, self.SHORT_FILE, 'appears stale.', self.SHORT_FILE_NUM_ROWS) - def test_new_file_longer(self, vector): + def test_new_file_longer(self, vector, unique_database): """Rewrites an existing file with a new longer file.""" # Full error is something like: # File '...' has an invalid version number: ff4C # This could be due to stale metadata. Try running "refresh - # test_written_file_xxx.alltypes_rewritten_file". - self.__overwrite_file_and_query(vector, self.SHORT_FILE, self.LONG_FILE, - 'invalid version number', self.LONG_FILE_NUM_ROWS) + # unique_database_name.new_file_longer". + table_name = "new_file_longer" + self.__overwrite_file_and_query(unique_database, table_name, + self.SHORT_FILE, self.LONG_FILE, 'invalid version number', self.LONG_FILE_NUM_ROWS) - def test_delete_file(self, vector): + def test_delete_file(self, vector, unique_database): """Deletes an existing file without refreshing metadata.""" - self.__create_test_table() + table_name = "delete_file" + table_location = self.__get_test_table_location(unique_database) + self.__create_test_table(unique_database, table_name, table_location) # Copy in a file and refresh the cached file metadata. - self.__copy_file_to_test_table(self.LONG_FILE) - self.client.execute("refresh %s" % self.__full_table_name()) + self.__copy_file_to_test_table(self.LONG_FILE, table_location) + self.client.execute("refresh %s.%s" % (unique_database, table_name)) # Delete the file without refreshing metadata. - check_call(["hadoop", "fs", "-rm", self.TABLE_LOCATION + '/*'], shell=False) + check_call(["hadoop", "fs", "-rm", table_location + '/*'], shell=False) # Query the table and check for expected error. try: - result = self.client.execute("select * from %s" % self.__full_table_name()) + result = self.client.execute("select * from %s.%s" % (unique_database, table_name)) assert False, "Query was expected to fail" except ImpalaBeeswaxException as e: assert 'No such file or directory' in str(e) # Refresh the table and make sure we get results - self.client.execute("refresh %s" % self.__full_table_name()) - result = self.client.execute("select count(*) from %s" % self.__full_table_name()) + self.client.execute("refresh %s.%s" % (unique_database, table_name)) + result = self.client.execute("select count(*) from %s.%s"\ + % (unique_database, table_name)) assert result.data == ['0'] - def __create_test_table(self): - self.__drop_test_table() + def __get_test_table_location(self, db_name): + return get_fs_path("/test-warehouse/%s" % db_name) + + def __create_test_table(self, db_name, table_name, table_location): self.client.execute(""" - CREATE TABLE %s LIKE functional.alltypesnopart STORED AS PARQUET + CREATE TABLE %s.%s LIKE functional.alltypesnopart STORED AS PARQUET LOCATION '%s' - """ % (self.__full_table_name(), self.TABLE_LOCATION)) + """ % (db_name, table_name, table_location)) - def __drop_test_table(self): - self.client.execute("DROP TABLE IF EXISTS %s" % self.__full_table_name()) - - def __copy_file_to_test_table(self, src_path): + def __copy_file_to_test_table(self, src_path, table_location): """Copies the provided path to the test table, overwriting any previous file.""" - dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME) + dst_path = "%s/%s" % (table_location, self.FILE_NAME) check_call(["hadoop", "fs", "-cp", "-f", src_path, dst_path], shell=False) - - def __full_table_name(self): - return "%s.%s" % (self.DATABASE, self.TABLE_NAME)
