Repository: incubator-impala Updated Branches: refs/heads/master f915d59aa -> 14cdb0497
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_ddl.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py index 9a38205..d40f77c 100644 --- a/tests/metadata/test_ddl.py +++ b/tests/metadata/test_ddl.py @@ -28,9 +28,9 @@ from tests.util.filesystem_utils import WAREHOUSE, IS_LOCAL # Validates DDL statements (create, drop) class TestDdlStatements(ImpalaTestSuite): - TEST_DBS = ['ddl_test_db', 'ddl_purge_db', 'alter_table_test_db', 'alter_table_test_db2', - 'function_ddl_test', 'udf_test', 'data_src_test', 'truncate_table_test_db', - 'test_db', 'alter_purge_db', 'db_with_comment'] + TEST_DBS = ['ddl_test_db', 'ddl_purge_db', 'alter_table_test_db', + 'alter_table_test_db2', 'function_ddl_test', 'udf_test', 'data_src_test', + 'truncate_table_test_db', 'test_db', 'alter_purge_db', 'db_with_comment'] @classmethod def get_workload(self): @@ -65,9 +65,9 @@ class TestDdlStatements(ImpalaTestSuite): # time a table is created with the same location. This also helps remove any stale # data from the last test run. for dir_ in ['part_data', 't1_tmp1', 't_part_tmp']: - self.hdfs_client.delete_file_dir('test-warehouse/%s' % dir_, recursive=True) + self.filesystem_client.delete_file_dir('test-warehouse/%s' % dir_, recursive=True) - @SkipIfS3.hdfs_client # S3: missing coverage: drop table/partition with PURGE + @SkipIfS3.hdfs_purge @SkipIfLocal.hdfs_client @pytest.mark.execute_serially def test_drop_table_with_purge(self): @@ -80,31 +80,31 @@ class TestDdlStatements(ImpalaTestSuite): self.client.execute("create table {0}.t2(i int)".format(DDL_PURGE_DB)) # Create sample test data files under the table directories self.hdfs_client.create_file("test-warehouse/{0}.db/t1/t1.txt".format(DDL_PURGE_DB),\ - file_data='t1') + file_data='t1') self.hdfs_client.create_file("test-warehouse/{0}.db/t2/t2.txt".format(DDL_PURGE_DB),\ - file_data='t2') + file_data='t2') # Drop the table (without purge) and make sure it exists in trash self.client.execute("drop table {0}.t1".format(DDL_PURGE_DB)) assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/t1.txt".\ - format(DDL_PURGE_DB)) + format(DDL_PURGE_DB)) assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/".format(DDL_PURGE_DB)) assert self.hdfs_client.exists(\ - "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/t1.txt".\ - format(getpass.getuser(), DDL_PURGE_DB)) + "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/t1.txt".\ + format(getpass.getuser(), DDL_PURGE_DB)) assert self.hdfs_client.exists(\ - "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1".\ - format(getpass.getuser(), DDL_PURGE_DB)) + "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1".\ + format(getpass.getuser(), DDL_PURGE_DB)) # Drop the table (with purge) and make sure it doesn't exist in trash self.client.execute("drop table {0}.t2 purge".format(DDL_PURGE_DB)) assert not self.hdfs_client.exists("test-warehouse/{0}.db/t2/".format(DDL_PURGE_DB)) assert not self.hdfs_client.exists("test-warehouse/{0}.db/t2/t2.txt".\ - format(DDL_PURGE_DB)) + format(DDL_PURGE_DB)) assert not self.hdfs_client.exists(\ - "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t2/t2.txt".\ - format(getpass.getuser(), DDL_PURGE_DB)) + "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t2/t2.txt".\ + format(getpass.getuser(), DDL_PURGE_DB)) assert not self.hdfs_client.exists(\ - "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t2".\ - format(getpass.getuser(), DDL_PURGE_DB)) + "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t2".\ + format(getpass.getuser(), DDL_PURGE_DB)) # Create an external table t3 and run the same test as above. Make # sure the data is not deleted self.hdfs_client.make_dir("test-warehouse/data_t3/", permission=777) @@ -115,101 +115,100 @@ class TestDdlStatements(ImpalaTestSuite): assert self.hdfs_client.exists("test-warehouse/data_t3/data.txt") self.hdfs_client.delete_file_dir("test-warehouse/data_t3", recursive=True) - @SkipIfS3.hdfs_client # S3: missing coverage: drop table/database @SkipIfLocal.hdfs_client @pytest.mark.execute_serially def test_drop_cleans_hdfs_dirs(self): DDL_TEST_DB = "ddl_test_db" - self.hdfs_client.delete_file_dir("test-warehouse/ddl_test_db.db/", recursive=True) - assert not self.hdfs_client.exists("test-warehouse/ddl_test_db.db/") + self.filesystem_client.delete_file_dir("test-warehouse/ddl_test_db.db/", + recursive=True) + assert not self.filesystem_client.exists("test-warehouse/ddl_test_db.db/") self.client.execute('use default') self._create_db(DDL_TEST_DB) # Verify the db directory exists - assert self.hdfs_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB)) + assert self.filesystem_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB)) self.client.execute("create table {0}.t1(i int)".format(DDL_TEST_DB)) # Verify the table directory exists - assert self.hdfs_client.exists("test-warehouse/{0}.db/t1/".format(DDL_TEST_DB)) + assert self.filesystem_client.exists("test-warehouse/{0}.db/t1/".format(DDL_TEST_DB)) # Dropping the table removes the table's directory and preserves the db's directory self.client.execute("drop table {0}.t1".format(DDL_TEST_DB)) - assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/".format(DDL_TEST_DB)) - assert self.hdfs_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB)) + assert not self.filesystem_client.exists( + "test-warehouse/{0}.db/t1/".format(DDL_TEST_DB)) + assert self.filesystem_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB)) # Dropping the db removes the db's directory self.client.execute("drop database {0}".format(DDL_TEST_DB)) - assert not self.hdfs_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB)) + assert not self.filesystem_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB)) # Dropping the db using "cascade" removes all tables' and db's directories # but keeps the external tables' directory self._create_db(DDL_TEST_DB) self.client.execute("create table {0}.t1(i int)".format(DDL_TEST_DB)) self.client.execute("create table {0}.t2(i int)".format(DDL_TEST_DB)) - self.client.execute("create external table {0}.t3(i int) " - "location '/test-warehouse/{0}/t3/'".format(DDL_TEST_DB)) + result = self.client.execute("create external table {0}.t3(i int) " + "location '{1}/{0}/t3/'".format(DDL_TEST_DB, WAREHOUSE)) self.client.execute("drop database {0} cascade".format(DDL_TEST_DB)) - assert not self.hdfs_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB)) - assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/".format(DDL_TEST_DB)) - assert not self.hdfs_client.exists("test-warehouse/{0}.db/t2/".format(DDL_TEST_DB)) - assert self.hdfs_client.exists("test-warehouse/{0}/t3/".format(DDL_TEST_DB)) - self.hdfs_client.delete_file_dir("test-warehouse/{0}/t3/".format(DDL_TEST_DB), + assert not self.filesystem_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB)) + assert not self.filesystem_client.exists( + "test-warehouse/{0}.db/t1/".format(DDL_TEST_DB)) + assert not self.filesystem_client.exists( + "test-warehouse/{0}.db/t2/".format(DDL_TEST_DB)) + assert self.filesystem_client.exists("test-warehouse/{0}/t3/".format(DDL_TEST_DB)) + self.filesystem_client.delete_file_dir("test-warehouse/{0}/t3/".format(DDL_TEST_DB), recursive=True) - assert not self.hdfs_client.exists("test-warehouse/{0}/t3/".format(DDL_TEST_DB)) + assert not self.filesystem_client.exists("test-warehouse/{0}/t3/".format(DDL_TEST_DB)) - @SkipIfS3.insert # S3: missing coverage: truncate table @SkipIfLocal.hdfs_client @pytest.mark.execute_serially def test_truncate_cleans_hdfs_files(self): TRUNCATE_DB = "truncate_table_test_db" - self.hdfs_client.delete_file_dir("test-warehouse/%s.db/" % TRUNCATE_DB, + self.filesystem_client.delete_file_dir("test-warehouse/%s.db/" % TRUNCATE_DB, recursive=True) - assert not self.hdfs_client.exists("test-warehouse/%s.db/" % TRUNCATE_DB) + assert not self.filesystem_client.exists("test-warehouse/%s.db/" % TRUNCATE_DB) self._create_db(TRUNCATE_DB, sync=True) # Verify the db directory exists - assert self.hdfs_client.exists("test-warehouse/%s.db/" % TRUNCATE_DB) + assert self.filesystem_client.exists("test-warehouse/%s.db/" % TRUNCATE_DB) self.client.execute("create table %s.t1(i int)" % TRUNCATE_DB) # Verify the table directory exists - assert self.hdfs_client.exists("test-warehouse/truncate_table_test_db.db/t1/") + assert self.filesystem_client.exists("test-warehouse/truncate_table_test_db.db/t1/") # Should have created one file in the table's dir self.client.execute("insert into %s.t1 values (1)" % TRUNCATE_DB) - ls = self.hdfs_client.list_dir("test-warehouse/%s.db/t1/" % TRUNCATE_DB) - assert len(ls['FileStatuses']['FileStatus']) == 2 + assert len(self.filesystem_client.ls("test-warehouse/%s.db/t1/" % TRUNCATE_DB)) == 2 # Truncating the table removes the data files and preserves the table's directory self.client.execute("truncate table %s.t1" % TRUNCATE_DB) - ls = self.hdfs_client.list_dir("test-warehouse/%s.db/t1/" % TRUNCATE_DB) - assert len(ls['FileStatuses']['FileStatus']) == 1 + assert len(self.filesystem_client.ls("test-warehouse/%s.db/t1/" % TRUNCATE_DB)) == 1 self.client.execute( "create table %s.t2(i int) partitioned by (p int)" % TRUNCATE_DB) # Verify the table directory exists - assert self.hdfs_client.exists("test-warehouse/%s.db/t2/" % TRUNCATE_DB) + assert self.filesystem_client.exists("test-warehouse/%s.db/t2/" % TRUNCATE_DB) # Should have created the partition dir, which should contain exactly one file self.client.execute( "insert into %s.t2 partition(p=1) values (1)" % TRUNCATE_DB) - ls = self.hdfs_client.list_dir("test-warehouse/%s.db/t2/p=1" % TRUNCATE_DB) - assert len(ls['FileStatuses']['FileStatus']) == 1 + assert len(self.filesystem_client.ls( + "test-warehouse/%s.db/t2/p=1" % TRUNCATE_DB)) == 1 # Truncating the table removes the data files and preserves the partition's directory self.client.execute("truncate table %s.t2" % TRUNCATE_DB) - assert self.hdfs_client.exists("test-warehouse/%s.db/t2/p=1" % TRUNCATE_DB) - ls = self.hdfs_client.list_dir("test-warehouse/%s.db/t2/p=1" % TRUNCATE_DB) - assert len(ls['FileStatuses']['FileStatus']) == 0 + assert self.filesystem_client.exists("test-warehouse/%s.db/t2/p=1" % TRUNCATE_DB) + assert len(self.filesystem_client.ls( + "test-warehouse/%s.db/t2/p=1" % TRUNCATE_DB)) == 0 - @SkipIfS3.insert # S3: missing coverage: truncate table @pytest.mark.execute_serially def test_truncate_table(self, vector): vector.get_value('exec_option')['abort_on_error'] = False self._create_db('truncate_table_test_db', sync=True) - self.run_test_case('QueryTest/truncate-table', vector, use_db='truncate_table_test_db', + self.run_test_case('QueryTest/truncate-table', vector, + use_db='truncate_table_test_db', multiple_impalad=self._use_multiple_impalad(vector)) - @SkipIfS3.insert @pytest.mark.execute_serially def test_create_table(self, vector): vector.get_value('exec_option')['abort_on_error'] = False @@ -270,7 +269,6 @@ class TestDdlStatements(ImpalaTestSuite): assert 'test_tbl' in self.client.execute("show tables in test_db").data @SkipIf.kudu_not_supported - @SkipIfS3.insert @pytest.mark.execute_serially def test_create_kudu(self, vector): self.expected_exceptions = 2 @@ -291,15 +289,15 @@ class TestDdlStatements(ImpalaTestSuite): assert 'ddl_test_db' not in self.all_db_names() # TODO: don't use hdfs_client - @SkipIfS3.insert # S3: missing coverage: alter table @SkipIfLocal.hdfs_client @pytest.mark.execute_serially def test_alter_table(self, vector): vector.get_value('exec_option')['abort_on_error'] = False # Create directory for partition data that does not use the (key=value) # format. - self.hdfs_client.make_dir("test-warehouse/part_data/", permission=777) - self.hdfs_client.create_file("test-warehouse/part_data/data.txt", file_data='1984') + self.filesystem_client.make_dir("test-warehouse/part_data/", permission=777) + self.filesystem_client.create_file( + "test-warehouse/part_data/data.txt", file_data='1984') # Create test databases self._create_db('alter_table_test_db', sync=True) @@ -307,7 +305,7 @@ class TestDdlStatements(ImpalaTestSuite): self.run_test_case('QueryTest/alter-table', vector, use_db='alter_table_test_db', multiple_impalad=self._use_multiple_impalad(vector)) - @SkipIfS3.hdfs_client # S3: missing coverage: alter table drop partition + @SkipIfS3.hdfs_purge # S3: missing coverage: alter table drop partition @SkipIfLocal.hdfs_client @pytest.mark.execute_serially def test_drop_partition_with_purge(self, vector): @@ -327,28 +325,28 @@ class TestDdlStatements(ImpalaTestSuite): # Drop the partition (j=1) without purge and make sure it exists in trash self.client.execute("alter table {0}.t1 drop partition(j=1)".format(ALTER_PURGE_DB)); assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=1/j1.txt".\ - format(ALTER_PURGE_DB)) + format(ALTER_PURGE_DB)) assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=1".\ - format(ALTER_PURGE_DB)) + format(ALTER_PURGE_DB)) assert self.hdfs_client.exists(\ - "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1/j1.txt".\ - format(getpass.getuser(), ALTER_PURGE_DB)) + "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1/j1.txt".\ + format(getpass.getuser(), ALTER_PURGE_DB)) assert self.hdfs_client.exists(\ - "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1".\ - format(getpass.getuser(), ALTER_PURGE_DB)) + "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1".\ + format(getpass.getuser(), ALTER_PURGE_DB)) # Drop the partition (with purge) and make sure it doesn't exist in trash self.client.execute("alter table {0}.t1 drop partition(j=2) purge".\ - format(ALTER_PURGE_DB)); + format(ALTER_PURGE_DB)); assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=2/j2.txt".\ - format(ALTER_PURGE_DB)) + format(ALTER_PURGE_DB)) assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=2".\ - format(ALTER_PURGE_DB)) + format(ALTER_PURGE_DB)) assert not self.hdfs_client.exists(\ - "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2".\ - format(getpass.getuser(), ALTER_PURGE_DB)) + "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2".\ + format(getpass.getuser(), ALTER_PURGE_DB)) assert not self.hdfs_client.exists( - "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2/j2.txt".\ - format(getpass.getuser(), ALTER_PURGE_DB)) + "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2/j2.txt".\ + format(getpass.getuser(), ALTER_PURGE_DB)) @pytest.mark.execute_serially def test_views_ddl(self, vector): @@ -434,11 +432,11 @@ class TestDdlStatements(ImpalaTestSuite): def create_drop_ddl(self, vector, db_name, create_stmts, drop_stmts, select_stmt, num_iterations=3): - """ Helper method to run CREATE/DROP DDL commands repeatedly and exercise the lib cache - create_stmts is the list of CREATE statements to be executed in order drop_stmts is - the list of DROP statements to be executed in order. Each statement should have a - '%s' placeholder to insert "IF EXISTS" or "". The select_stmt is just a single - statement to test after executing the CREATE statements. + """ Helper method to run CREATE/DROP DDL commands repeatedly and exercise the lib + cache create_stmts is the list of CREATE statements to be executed in order + drop_stmts is the list of DROP statements to be executed in order. Each statement + should have a '%s' placeholder to insert "IF EXISTS" or "". The select_stmt is just a + single statement to test after executing the CREATE statements. TODO: it's hard to tell that the cache is working (i.e. if it did nothing to drop the cache, these tests would still pass). Testing that is a bit harder and requires us to update the udf binary in the middle. @@ -454,7 +452,6 @@ class TestDdlStatements(ImpalaTestSuite): self.client.execute(select_stmt) for drop_stmt in drop_stmts: self.client.execute(drop_stmt % ("")) - @SkipIfS3.insert # S3: missing coverage: alter table partitions. @pytest.mark.execute_serially def test_create_alter_bulk_partition(self, vector): TBL_NAME = 'foo_part' @@ -478,8 +475,8 @@ class TestDdlStatements(ImpalaTestSuite): " set fileformat parquetfile" % TBL_NAME) # Alter one partition to a non-existent location twice (IMPALA-741) - self.hdfs_client.delete_file_dir("tmp/dont_exist1/", recursive=True) - self.hdfs_client.delete_file_dir("tmp/dont_exist2/", recursive=True) + self.filesystem_client.delete_file_dir("tmp/dont_exist1/", recursive=True) + self.filesystem_client.delete_file_dir("tmp/dont_exist2/", recursive=True) self.execute_query_expect_success(self.client, "alter table {0} partition(j=1,s='1') set location '{1}/tmp/dont_exist1'" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_explain.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_explain.py b/tests/metadata/test_explain.py index 1bc6011..0719a21 100644 --- a/tests/metadata/test_explain.py +++ b/tests/metadata/test_explain.py @@ -97,33 +97,33 @@ class TestExplainEmptyPartition(ImpalaTestSuite): def teardown_method(self, method): self.cleanup_db(self.TEST_DB_NAME) - @SkipIfS3.hdfs_client @SkipIfLocal.hdfs_client def test_non_empty_partition_0_rows(self): """Regression test for IMPALA-1708: if a partition has 0 rows but > 0 files after COMPUTE STATS, don't warn the user about missing stats. The files are probably corrupted, or used for something else.""" self.client.execute("SET EXPLAIN_LEVEL=3") - self.client.execute( - "CREATE TABLE %s.empty_partition (col int) partitioned by (p int)" % self.TEST_DB_NAME); + self.client.execute("CREATE TABLE %s.empty_partition (col int) " + "partitioned by (p int)" % self.TEST_DB_NAME) self.client.execute( "ALTER TABLE %s.empty_partition ADD PARTITION (p=NULL)" % self.TEST_DB_NAME) # Put an empty file in the partition so we have > 0 files, but 0 rows - self.hdfs_client.create_file( - "test-warehouse/%s.db/empty_partition/p=__HIVE_DEFAULT_PARTITION__/empty" % - self.TEST_DB_NAME, "") + self.filesystem_client.create_file( + "test-warehouse/%s.db/empty_partition/p=__HIVE_DEFAULT_PARTITION__/empty" % + self.TEST_DB_NAME, "") self.client.execute("REFRESH %s.empty_partition" % self.TEST_DB_NAME) self.client.execute("COMPUTE STATS %s.empty_partition" % self.TEST_DB_NAME) assert "NULL\t0\t1" in str( - self.client.execute("SHOW PARTITIONS %s.empty_partition" % self.TEST_DB_NAME)) + self.client.execute("SHOW PARTITIONS %s.empty_partition" % self.TEST_DB_NAME)) assert "missing relevant table and/or column statistics" not in str( - self.client.execute("EXPLAIN SELECT * FROM %s.empty_partition" % self.TEST_DB_NAME)) + self.client.execute( + "EXPLAIN SELECT * FROM %s.empty_partition" % self.TEST_DB_NAME)) # Now add a partition with some data (so it gets selected into the scan), to check # that its lack of stats is correctly identified self.client.execute( "ALTER TABLE %s.empty_partition ADD PARTITION (p=1)" % self.TEST_DB_NAME) - self.hdfs_client.create_file("test-warehouse/%s.db/empty_partition/p=1/rows" % + self.filesystem_client.create_file("test-warehouse/%s.db/empty_partition/p=1/rows" % self.TEST_DB_NAME, "1") self.client.execute("REFRESH %s.empty_partition" % self.TEST_DB_NAME) explain_result = str( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_hdfs_encryption.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_hdfs_encryption.py b/tests/metadata/test_hdfs_encryption.py index 049734d..037b4a0 100644 --- a/tests/metadata/test_hdfs_encryption.py +++ b/tests/metadata/test_hdfs_encryption.py @@ -28,7 +28,7 @@ PYWEBHDFS_TMP_DIR = 'tmp/test_encryption_load_data' TMP_DIR = '/%s' % (PYWEBHDFS_TMP_DIR) [email protected]_data [email protected]_encryption @SkipIfIsilon.hdfs_encryption @SkipIfLocal.hdfs_encryption @pytest.mark.execute_serially @@ -136,7 +136,6 @@ class TestHdfsEncryption(ImpalaTestSuite): else: self.client.execute('load data inpath \'%s\' into table tbl ' % (TMP_DIR)) - @SkipIfS3.hdfs_client @SkipIfIsilon.hdfs_encryption @pytest.mark.execute_serially def test_drop_partition_encrypt(self): @@ -187,7 +186,6 @@ class TestHdfsEncryption(ImpalaTestSuite): assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=3/j3.txt".format(TEST_DB)) assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=3".format(TEST_DB)) - @SkipIfS3.hdfs_client @SkipIfIsilon.hdfs_encryption @pytest.mark.execute_serially def test_drop_table_encrypt(self): http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_hdfs_permissions.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_hdfs_permissions.py b/tests/metadata/test_hdfs_permissions.py index 4d72354..05a6ac4 100644 --- a/tests/metadata/test_hdfs_permissions.py +++ b/tests/metadata/test_hdfs_permissions.py @@ -25,7 +25,7 @@ TEST_TBL = 'read_only_tbl' TBL_LOC = '%s/%s' % (WAREHOUSE, TEST_TBL) [email protected] [email protected]_acls @SkipIfLocal.hdfs_client class TestHdfsPermissions(ImpalaTestSuite): @classmethod http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_last_ddl_time_update.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_last_ddl_time_update.py b/tests/metadata/test_last_ddl_time_update.py index dfb89f8..cbcb105 100644 --- a/tests/metadata/test_last_ddl_time_update.py +++ b/tests/metadata/test_last_ddl_time_update.py @@ -72,7 +72,6 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite): self.run_test("alter table %s set fileformat textfile" % FULL_NAME, True) @pytest.mark.execute_serially - @SkipIfS3.insert def test_insert(self, vector): # static partition insert self.run_test("insert into %s partition(j=1, s='2012') " http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_load.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_load.py b/tests/metadata/test_load.py index bdf2db0..47703ec 100644 --- a/tests/metadata/test_load.py +++ b/tests/metadata/test_load.py @@ -7,7 +7,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.test_dimensions import ( create_single_exec_option_dimension, create_uncompressed_text_dimension) -from tests.common.skip import SkipIfS3, SkipIfLocal +from tests.common.skip import SkipIfLocal from tests.util.filesystem_utils import WAREHOUSE TEST_TBL_PART = "test_load" @@ -18,7 +18,6 @@ MULTIAGG_PATH = 'test-warehouse/alltypesaggmultifiles/year=2010/month=1/day=1' HIDDEN_FILES = ["{0}/3/.100101.txt".format(STAGING_PATH), "{0}/3/_100101.txt".format(STAGING_PATH)] [email protected]_data @SkipIfLocal.hdfs_client class TestLoadData(ImpalaTestSuite): @@ -35,7 +34,7 @@ class TestLoadData(ImpalaTestSuite): def _clean_test_tables(self): self.client.execute("drop table if exists functional.{0}".format(TEST_TBL_NOPART)) self.client.execute("drop table if exists functional.{0}".format(TEST_TBL_PART)) - self.hdfs_client.delete_file_dir(STAGING_PATH, recursive=True) + self.filesystem_client.delete_file_dir(STAGING_PATH, recursive=True) def teardown_method(self, method): self._clean_test_tables() @@ -47,34 +46,32 @@ class TestLoadData(ImpalaTestSuite): # Create staging directories for load data inpath. The staging directory is laid out # as follows: # - It has 6 sub directories, numbered 1-6 - # - The directories are populated with files from a subset of partitions in existing - # partitioned tables. + # - The directories are populated with files from a subset of partitions in + # existing partitioned tables. # - Sub Directories 1-4 have single files copied from alltypes/ # - Sub Directories 5-6 have multiple files (4) copied from alltypesaggmultifiles # - Sub Directory 3 also has hidden files, in both supported formats. # - All sub-dirs contain a hidden directory for i in xrange(1, 6): stagingDir = '{0}/{1}'.format(STAGING_PATH, i) - self.hdfs_client.make_dir(stagingDir, permission=777) - self.hdfs_client.make_dir('{0}/_hidden_dir'.format(stagingDir), permission=777) - + self.filesystem_client.make_dir(stagingDir, permission=777) + self.filesystem_client.make_dir('{0}/_hidden_dir'.format(stagingDir), + permission=777) # Copy single file partitions from alltypes. for i in xrange(1, 4): - self.hdfs_client.copy(ALLTYPES_PATH, "{0}/{1}/100101.txt".format(STAGING_PATH, i)) - + self.filesystem_client.copy(ALLTYPES_PATH, + "{0}/{1}/100101.txt".format(STAGING_PATH, i)) # Copy multi file partitions from alltypesaggmultifiles. - file_infos = self.hdfs_client.list_dir( - MULTIAGG_PATH).get('FileStatuses').get('FileStatus') - file_names = [info.get('pathSuffix') for info in file_infos] + file_names = self.filesystem_client.ls(MULTIAGG_PATH) for i in xrange(4, 6): for file_ in file_names: - self.hdfs_client.copy( + self.filesystem_client.copy( "{0}/{1}".format(MULTIAGG_PATH, file_), '{0}/{1}/{2}'.format(STAGING_PATH, i, file_)) # Create two hidden files, with a leading . and _ for file_ in HIDDEN_FILES: - self.hdfs_client.copy(ALLTYPES_PATH, file_) + self.filesystem_client.copy(ALLTYPES_PATH, file_) # Create both the test tables. self.client.execute("create table functional.{0} like functional.alltypes" @@ -86,5 +83,4 @@ class TestLoadData(ImpalaTestSuite): self.run_test_case('QueryTest/load', vector) # The hidden files should not have been moved as part of the load operation. for file_ in HIDDEN_FILES: - assert self.hdfs_client.get_file_dir_status(file_), "{0} does not exist".format( - file_) + assert self.filesystem_client.exists(file_), "{0} does not exist".format(file_) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_partition_metadata.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_partition_metadata.py b/tests/metadata/test_partition_metadata.py index 5c980bb..228bec4 100644 --- a/tests/metadata/test_partition_metadata.py +++ b/tests/metadata/test_partition_metadata.py @@ -56,7 +56,6 @@ class TestPartitionMetadata(ImpalaTestSuite): def teardown_method(self, method): self.cleanup_db(self.TEST_DB) - @SkipIfS3.insert # S3: missing coverage: partition DDL @SkipIfLocal.hdfs_client def test_multiple_partitions_same_location(self, vector): """Regression test for IMPALA-597. Verifies Impala is able to properly read @@ -64,16 +63,16 @@ class TestPartitionMetadata(ImpalaTestSuite): """ self.client.execute("use %s" % self.TEST_DB) impala_location = '%s/%s.db/%s' % (WAREHOUSE, self.TEST_DB, self.TEST_TBL) - hdfs_client_location = impala_location.split("/")[-1] + filesystem_client_location = impala_location.split("/")[-1] # Cleanup any existing data in the table directory. - self.hdfs_client.delete_file_dir(hdfs_client_location, recursive=True) + self.filesystem_client.delete_file_dir(filesystem_client_location, recursive=True) # Create the table self.client.execute("create table {0}(i int) partitioned by(j int)" "location '{1}/{2}.db/{0}'".format(self.TEST_TBL, WAREHOUSE, self.TEST_DB)) # Point multiple partitions to the same location and use partition locations that # do not contain a key=value path. - self.hdfs_client.make_dir(hdfs_client_location + '/p') + self.filesystem_client.make_dir(filesystem_client_location + '/p') # Point both partitions to the same location. self.client.execute("alter table %s add partition (j=1) location '%s/p'" % http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_recover_partitions.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_recover_partitions.py b/tests/metadata/test_recover_partitions.py index 9133b07..a219433 100644 --- a/tests/metadata/test_recover_partitions.py +++ b/tests/metadata/test_recover_partitions.py @@ -16,7 +16,7 @@ import pytest from tests.common.test_dimensions import ALL_NODES_ONLY from tests.common.impala_test_suite import * -from tests.common.skip import SkipIfS3, SkipIfLocal +from tests.common.skip import SkipIfLocal from tests.util.filesystem_utils import WAREHOUSE, IS_DEFAULT_FS # Validates ALTER TABLE RECOVER PARTITIONS statement @@ -59,7 +59,6 @@ class TestRecoverPartitions(ImpalaTestSuite): def teardown_method(self, method): self.cleanup_db(self.TEST_DB) - @SkipIfS3.insert @SkipIfLocal.hdfs_client @pytest.mark.execute_serially def test_recover_partitions(self, vector): @@ -81,8 +80,9 @@ class TestRecoverPartitions(ImpalaTestSuite): # Create a path for a new partition using hdfs client and add a file with some values. # Test that the partition can be recovered and that the inserted data are accessible. - self.hdfs_client.make_dir(self.BASE_DIR + leaf_dir) - self.hdfs_client.create_file(self.BASE_DIR + leaf_dir + file_path, inserted_value) + self.filesystem_client.make_dir(self.BASE_DIR + leaf_dir) + self.filesystem_client.create_file(self.BASE_DIR + leaf_dir + file_path, + inserted_value) result = self.execute_query_expect_success(self.client, "SHOW PARTITIONS %s" % (self.TEST_TBL)) assert self.has_value(part_name, result.data) == False @@ -102,7 +102,7 @@ class TestRecoverPartitions(ImpalaTestSuite): result = self.execute_query_expect_success(self.client, "SHOW PARTITIONS %s" % (self.TEST_TBL)) old_length = len(result.data) - self.hdfs_client.make_dir(self.BASE_DIR + malformed_dir) + self.filesystem_client.make_dir(self.BASE_DIR + malformed_dir) self.execute_query_expect_success(self.client, "ALTER TABLE %s RECOVER PARTITIONS" % (self.TEST_TBL)) result = self.execute_query_expect_success(self.client, @@ -113,8 +113,9 @@ class TestRecoverPartitions(ImpalaTestSuite): # Create a directory whose subdirectory names contain __HIVE_DEFAULT_PARTITION__ # and check that is recovered as a NULL partition. - self.hdfs_client.make_dir(self.BASE_DIR + null_dir) - self.hdfs_client.create_file(self.BASE_DIR + null_dir + file_path, null_inserted_value) + self.filesystem_client.make_dir(self.BASE_DIR + null_dir) + self.filesystem_client.create_file( + self.BASE_DIR + null_dir + file_path, null_inserted_value) result = self.execute_query_expect_success(self.client, "SHOW PARTITIONS %s" % (self.TEST_TBL)) assert self.has_value(self.DEF_NULL_PART_KEY, result.data) == False @@ -129,7 +130,6 @@ class TestRecoverPartitions(ImpalaTestSuite): "select c from %s" % self.TEST_TBL) assert self.has_value(null_inserted_value, result.data) == True - @SkipIfS3.insert @SkipIfLocal.hdfs_client @pytest.mark.execute_serially def test_nondefault_location_partitions(self, vector): @@ -149,9 +149,10 @@ class TestRecoverPartitions(ImpalaTestSuite): self.execute_query_expect_success(self.client, "ALTER TABLE %s PARTITION (i=1, p='p3') SET LOCATION '%s/%s.db/tmp' " % (self.TEST_TBL, WAREHOUSE, self.TEST_DB)) - self.hdfs_client.delete_file_dir(self.BASE_DIR + leaf_dir, recursive=True) - self.hdfs_client.make_dir(self.BASE_DIR + leaf_dir); - self.hdfs_client.create_file(self.BASE_DIR + leaf_dir + file_path, inserted_value) + self.filesystem_client.delete_file_dir(self.BASE_DIR + leaf_dir, recursive=True) + self.filesystem_client.make_dir(self.BASE_DIR + leaf_dir); + self.filesystem_client.create_file(self.BASE_DIR + leaf_dir + file_path, + inserted_value) self.execute_query_expect_success(self.client, "ALTER TABLE %s RECOVER PARTITIONS" % (self.TEST_TBL)) # Ensure that no duplicate partitions are recovered. @@ -166,7 +167,6 @@ class TestRecoverPartitions(ImpalaTestSuite): "select c from %s" % self.TEST_TBL) assert self.has_value(inserted_value, result.data) == True - @SkipIfS3.insert @SkipIfLocal.hdfs_client @pytest.mark.execute_serially def test_duplicate_partitions(self, vector): @@ -184,14 +184,15 @@ class TestRecoverPartitions(ImpalaTestSuite): # Create a partition with path "/i=1/p=p4". # Create a path "/i=0001/p=p4" using hdfs client, and add a file with some values. - # Test that no new partition will be recovered and the inserted data are not accessible. + # Test that no new partition will be recovered and the inserted data are not + # accessible. leaf_dir = "i=0001/p=p4/" inserted_value = "5" self.execute_query_expect_success(self.client, "ALTER TABLE %s ADD PARTITION(i=1, p='p4')" % (self.TEST_TBL)) - self.hdfs_client.make_dir(self.BASE_DIR + leaf_dir); - self.hdfs_client.create_file(self.BASE_DIR + leaf_dir + file_path, inserted_value) + self.filesystem_client.make_dir(self.BASE_DIR + leaf_dir); + self.filesystem_client.create_file(self.BASE_DIR + leaf_dir + file_path, inserted_value) self.execute_query_expect_success(self.client, "ALTER TABLE %s RECOVER PARTITIONS" % (self.TEST_TBL)) result = self.execute_query_expect_success(self.client, @@ -205,8 +206,8 @@ class TestRecoverPartitions(ImpalaTestSuite): result = self.execute_query_expect_success(self.client, "SHOW PARTITIONS %s" % (self.TEST_TBL)) old_length = len(result.data) - self.hdfs_client.make_dir(self.BASE_DIR + same_value_dir1) - self.hdfs_client.make_dir(self.BASE_DIR + same_value_dir2) + self.filesystem_client.make_dir(self.BASE_DIR + same_value_dir1) + self.filesystem_client.make_dir(self.BASE_DIR + same_value_dir2) # Only one partition will be added. self.execute_query_expect_success(self.client, "ALTER TABLE %s RECOVER PARTITIONS" % (self.TEST_TBL)) @@ -216,7 +217,6 @@ class TestRecoverPartitions(ImpalaTestSuite): "ALTER TABLE %s RECOVER PARTITIONS failed to handle duplicate partition key values." % (self.TEST_TBL)) - @SkipIfS3.insert @SkipIfLocal.hdfs_client @pytest.mark.execute_serially def test_post_invalidate(self, vector): @@ -233,8 +233,9 @@ class TestRecoverPartitions(ImpalaTestSuite): # Test that the recovered partitions are properly stored in Hive MetaStore. # Invalidate the table metadata and then check if the recovered partitions # are accessible. - self.hdfs_client.make_dir(self.BASE_DIR + leaf_dir); - self.hdfs_client.create_file(self.BASE_DIR + leaf_dir + file_path, inserted_value) + self.filesystem_client.make_dir(self.BASE_DIR + leaf_dir); + self.filesystem_client.create_file(self.BASE_DIR + leaf_dir + file_path, + inserted_value) self.execute_query_expect_success(self.client, "ALTER TABLE %s RECOVER PARTITIONS" % (self.TEST_TBL)) result = self.execute_query_expect_success(self.client, @@ -252,7 +253,6 @@ class TestRecoverPartitions(ImpalaTestSuite): "select c from %s" % self.TEST_TBL) assert self.has_value('4', result.data) == True - @SkipIfS3.insert @SkipIfLocal.hdfs_client @pytest.mark.execute_serially def test_support_all_types(self, vector): @@ -279,7 +279,7 @@ class TestRecoverPartitions(ImpalaTestSuite): "SHOW PARTITIONS %s" % (self.TEST_TBL2)) old_length = len(result.data) normal_dir = '/'.join(normal_values) - self.hdfs_client.make_dir(self.BASE_DIR2 + normal_dir) + self.filesystem_client.make_dir(self.BASE_DIR2 + normal_dir) # One partition will be added. self.execute_query_expect_success(self.client, "ALTER TABLE %s RECOVER PARTITIONS" % (self.TEST_TBL2)) @@ -308,7 +308,7 @@ class TestRecoverPartitions(ImpalaTestSuite): invalid_dir += (normal_values[j] + "/") else: invalid_dir += (invalid_values[j] + "/") - self.hdfs_client.make_dir(self.BASE_DIR2 + invalid_dir) + self.filesystem_client.make_dir(self.BASE_DIR2 + invalid_dir) # No partition will be added. self.execute_query_expect_success(self.client, "ALTER TABLE %s RECOVER PARTITIONS" % (self.TEST_TBL2)) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_show_create_table.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_show_create_table.py b/tests/metadata/test_show_create_table.py index c9dd5aa..fdd868d 100644 --- a/tests/metadata/test_show_create_table.py +++ b/tests/metadata/test_show_create_table.py @@ -47,7 +47,6 @@ class TestShowCreateTable(ImpalaTestSuite): v.get_value('table_format').file_format == 'text' and v.get_value('table_format').compression_codec == 'none') - @SkipIfS3.insert def test_show_create_table(self, vector, unique_database): self.__run_show_create_table_test_case('QueryTest/show-create-table', vector, unique_database) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_aggregation.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py index 99bfb22..0d113a7 100644 --- a/tests/query_test/test_aggregation.py +++ b/tests/query_test/test_aggregation.py @@ -94,7 +94,6 @@ class TestAggregationQueries(ImpalaTestSuite): if cls.exploration_strategy() == 'core': cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload())) - @SkipIfS3.insert @pytest.mark.execute_serially def test_non_codegen_tinyint_grouping(self, vector): # Regression for IMPALA-901. The test includes an INSERT statement, so can only be run http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_cancellation.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py index ea619e3..729ed3b 100644 --- a/tests/query_test/test_cancellation.py +++ b/tests/query_test/test_cancellation.py @@ -157,7 +157,6 @@ class TestCancellationSerial(TestCancellation): cls.TestMatrix.add_constraint(lambda v: v.get_value('cancel_delay') in [3]) cls.TestMatrix.add_constraint(lambda v: v.get_value('query') == choice(QUERIES)) - @SkipIfS3.insert @pytest.mark.execute_serially def test_cancel_insert(self, vector): self.execute_cancel_test(vector) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_chars.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_chars.py b/tests/query_test/test_chars.py index 11381b3..a1bfe58 100644 --- a/tests/query_test/test_chars.py +++ b/tests/query_test/test_chars.py @@ -9,7 +9,6 @@ from tests.common.impala_test_suite import * from tests.common.skip import SkipIfS3 from tests.util.filesystem_utils import WAREHOUSE [email protected] class TestStringQueries(ImpalaTestSuite): @classmethod def get_workload(cls): http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_compressed_formats.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py index a50fafe..15f799c 100644 --- a/tests/query_test/test_compressed_formats.py +++ b/tests/query_test/test_compressed_formats.py @@ -111,7 +111,6 @@ class TestCompressedFormats(ImpalaTestSuite): finally: call(["hive", "-e", drop_cmd]); [email protected] class TestTableWriters(ImpalaTestSuite): @classmethod def get_workload(cls): @@ -143,7 +142,6 @@ class TestTableWriters(ImpalaTestSuite): pytest.skip() self.run_test_case('QueryTest/text-writer', vector) [email protected] @pytest.mark.execute_serially class TestLargeCompressedFile(ImpalaTestSuite): """ Tests that we gracefully handle when a compressed file in HDFS is larger http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_delimited_text.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_delimited_text.py b/tests/query_test/test_delimited_text.py index 864b411..371b7f4 100644 --- a/tests/query_test/test_delimited_text.py +++ b/tests/query_test/test_delimited_text.py @@ -43,11 +43,9 @@ class TestDelimitedText(ImpalaTestSuite): self.client.execute('drop table if exists %s.nl_queries' % self.TEST_DB_NAME) self.client.execute('drop database if exists %s' % self.TEST_DB_NAME) - @SkipIfS3.insert def test_delimited_text(self, vector): self.run_test_case('QueryTest/delimited-text', vector) - @SkipIfS3.insert def test_delimited_text_newlines(self, vector): """ Test text with newlines in strings - IMPALA-1943. Execute queries from Python to avoid issues with newline handling in test file format. """ @@ -73,7 +71,6 @@ class TestDelimitedText(ImpalaTestSuite): assert len(result.data) == 1 assert result.data[0] == "2" - @SkipIfS3.insert @pytest.mark.execute_serially def test_delimited_text_latin_chars(self, vector): """Verifies Impala is able to properly handle delimited text that contains http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_insert.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py index 981520f..6929fca 100644 --- a/tests/query_test/test_insert.py +++ b/tests/query_test/test_insert.py @@ -15,7 +15,6 @@ from tests.util.filesystem_utils import IS_LOCAL # TODO: Add Gzip back. IMPALA-424 PARQUET_CODECS = ['none', 'snappy'] [email protected] class TestInsertQueries(ImpalaTestSuite): @classmethod def get_workload(self): @@ -125,7 +124,6 @@ class TestInsertWideTable(ImpalaTestSuite): actual = QueryTestResult(parse_result_rows(result), types, labels, order_matters=False) assert expected == actual [email protected] class TestInsertPartKey(ImpalaTestSuite): """Regression test for IMPALA-875""" @classmethod @@ -151,7 +149,6 @@ class TestInsertPartKey(ImpalaTestSuite): self.run_test_case('QueryTest/insert_part_key', vector, multiple_impalad=vector.get_value('exec_option')['sync_ddl'] == 1) [email protected] class TestInsertNullQueries(ImpalaTestSuite): @classmethod def get_workload(self): http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_insert_behaviour.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_insert_behaviour.py b/tests/query_test/test_insert_behaviour.py index d6c344b..a03ce00 100644 --- a/tests/query_test/test_insert_behaviour.py +++ b/tests/query_test/test_insert_behaviour.py @@ -22,8 +22,6 @@ from tests.common.parametrize import UniqueDatabase from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal from tests.util.filesystem_utils import WAREHOUSE, get_fs_path - [email protected] @SkipIfLocal.hdfs_client class TestInsertBehaviour(ImpalaTestSuite): """Tests for INSERT behaviour that isn't covered by checking query results""" @@ -45,12 +43,12 @@ class TestInsertBehaviour(ImpalaTestSuite): def test_insert_removes_staging_files(self): TBL_NAME = "insert_overwrite_nopart" insert_staging_dir = ("test-warehouse/functional.db/%s/" - "_impala_insert_staging" % TBL_NAME) - self.hdfs_client.delete_file_dir(insert_staging_dir, recursive=True) - self.client.execute("INSERT OVERWRITE functional.%s" - " SELECT int_col FROM functional.tinyinttable" % TBL_NAME) - ls = self.hdfs_client.list_dir(insert_staging_dir) - assert len(ls['FileStatuses']['FileStatus']) == 0 + "_impala_insert_staging" % TBL_NAME) + self.filesystem_client.delete_file_dir(insert_staging_dir, recursive=True) + self.client.execute("INSERT OVERWRITE functional.%s " + "SELECT int_col FROM functional.tinyinttable" % TBL_NAME) + ls = self.filesystem_client.ls(insert_staging_dir) + assert len(ls) == 0 @pytest.mark.execute_serially def test_insert_preserves_hidden_files(self): @@ -59,27 +57,28 @@ class TestInsertBehaviour(ImpalaTestSuite): table_dir = "test-warehouse/functional.db/%s/" % TBL_NAME hidden_file_locations = [".hidden", "_hidden"] dir_locations = ["dir", ".hidden_dir"] + for dir_ in dir_locations: - self.hdfs_client.make_dir(table_dir + dir_) + self.filesystem_client.make_dir(table_dir + dir_) + + # We do this here because the above 'make_dir' call doesn't make a directory for S3. + for dir_ in dir_locations: + self.filesystem_client.create_file( + table_dir + dir_ + '/' + hidden_file_locations[0] , '', overwrite=True) + for file_ in hidden_file_locations: - self.hdfs_client.create_file(table_dir + file_, '', overwrite=True) + self.filesystem_client.create_file(table_dir + file_, '', overwrite=True) self.client.execute("INSERT OVERWRITE functional.%s" " SELECT int_col FROM functional.tinyinttable" % TBL_NAME) for file_ in hidden_file_locations: - try: - self.hdfs_client.get_file_dir_status(table_dir + file_) - except: - err_msg = "Hidden file '%s' was unexpectedly deleted by INSERT OVERWRITE" - pytest.fail(err_msg % (table_dir + file_)) + assert self.filesystem_client.exists(table_dir + file_), "Hidden file {0} was " \ + "unexpectedly deleted by INSERT OVERWRITE".format(table_dir + file_) for dir_ in dir_locations: - try: - self.hdfs_client.get_file_dir_status(table_dir + file_) - except: - err_msg = "Directory '%s' was unexpectedly deleted by INSERT OVERWRITE" - pytest.fail(err_msg % (table_dir + dir_)) + assert self.filesystem_client.exists(table_dir + dir_), "Directory {0} was " \ + "unexpectedly deleted by INSERT OVERWRITE".format(table_dir + dir_) @UniqueDatabase.parametrize(name_prefix='test_insert_alter_partition_location_db') def test_insert_alter_partition_location(self, unique_database): @@ -90,7 +89,7 @@ class TestInsertBehaviour(ImpalaTestSuite): table_name = "`{0}`.`insert_alter_partition_location`".format(unique_database) self.execute_query_expect_success(self.client, "DROP TABLE IF EXISTS %s" % table_name) - self.hdfs_client.delete_file_dir(part_dir, recursive=True) + self.filesystem_client.delete_file_dir(part_dir, recursive=True) self.execute_query_expect_success( self.client, @@ -113,9 +112,9 @@ class TestInsertBehaviour(ImpalaTestSuite): # Should have created the partition dir, which should contain exactly one file (not in # a subdirectory) - ls = self.hdfs_client.list_dir(part_dir) - assert len(ls['FileStatuses']['FileStatus']) == 1 + assert len(self.filesystem_client.ls(part_dir)) == 1 + @SkipIfS3.hdfs_acls @SkipIfIsilon.hdfs_acls @pytest.mark.xfail(run=False, reason="Fails intermittently on test clusters") @pytest.mark.execute_serially @@ -175,6 +174,7 @@ class TestInsertBehaviour(ImpalaTestSuite): "PARTITION(p1=1, p2=2, p3=30) VALUES(1)") check_has_acls("p1=1/p2=2/p3=30", "default:group:new_leaf_group:-w-") + @SkipIfS3.hdfs_acls @SkipIfIsilon.hdfs_acls def test_insert_file_permissions(self, unique_database): """Test that INSERT correctly respects file permission (minimum ACLs)""" @@ -224,6 +224,7 @@ class TestInsertBehaviour(ImpalaTestSuite): # Should be writable because 'other' ACLs allow writes self.execute_query_expect_success(self.client, insert_query) + @SkipIfS3.hdfs_acls @SkipIfIsilon.hdfs_acls def test_insert_acl_permissions(self, unique_database): """Test that INSERT correctly respects ACLs""" @@ -300,6 +301,7 @@ class TestInsertBehaviour(ImpalaTestSuite): # Should be writable because 'other' ACLs allow writes self.execute_query_expect_success(self.client, insert_query) + @SkipIfS3.hdfs_acls @SkipIfIsilon.hdfs_acls def test_load_permissions(self, unique_database): # We rely on test_insert_acl_permissions() to exhaustively check that ACL semantics @@ -357,13 +359,9 @@ class TestInsertBehaviour(ImpalaTestSuite): """Test insert/select query won't trigger partition directory or zero size data file creation if the resultset of select is empty.""" def check_path_exists(path, should_exist): - try: - self.hdfs_client.get_file_dir_status(path) - if not should_exist: - pytest.fail("file/dir '%s' unexpectedly exists" % path) - except Exception: - if should_exist: - pytest.fail("file/dir '%s' does not exist" % path) + assert self.filesystem_client.exists(path) == should_exist, "file/dir '{0}' " \ + "should {1}exist but does {2}exist.".format( + path, '' if should_exist else 'not ', 'not ' if should_exist else '') db_path = "test-warehouse/%s.db/" % self.TEST_DB_NAME table_path = db_path + "test_insert_empty_result" @@ -382,8 +380,8 @@ class TestInsertBehaviour(ImpalaTestSuite): insert_query = ("INSERT INTO TABLE {0} PARTITION(year=2009, month=1)" "select 1, 1 from {0} LIMIT 0".format(table_name)) self.execute_query_expect_success(self.client, insert_query) - # Partition directory should not be created - check_path_exists(partition_path, False) + # Partition directory is created + check_path_exists(partition_path, True) # Insert one record insert_query_one_row = ("INSERT INTO TABLE %s PARTITION(year=2009, month=1) " @@ -391,24 +389,24 @@ class TestInsertBehaviour(ImpalaTestSuite): self.execute_query_expect_success(self.client, insert_query_one_row) # Partition directory should be created with one data file check_path_exists(partition_path, True) - ls = self.hdfs_client.list_dir(partition_path) - assert len(ls['FileStatuses']['FileStatus']) == 1 + ls = (self.filesystem_client.ls(partition_path)) + assert len(ls) == 1 # Run an insert/select statement that returns an empty resultset again self.execute_query_expect_success(self.client, insert_query) # No new data file should be created - new_ls = self.hdfs_client.list_dir(partition_path) - assert len(new_ls['FileStatuses']['FileStatus']) == 1 - assert new_ls['FileStatuses'] == ls['FileStatuses'] + new_ls = self.filesystem_client.ls(partition_path) + assert len(new_ls) == 1 + assert new_ls == ls # Run an insert overwrite/select that returns an empty resultset insert_query = ("INSERT OVERWRITE {0} PARTITION(year=2009, month=1)" " select 1, 1 from {0} LIMIT 0".format(table_name)) self.execute_query_expect_success(self.client, insert_query) # Data file should be deleted - new_ls2 = self.hdfs_client.list_dir(partition_path) - assert len(new_ls2['FileStatuses']['FileStatus']) == 0 - assert new_ls['FileStatuses'] != new_ls2['FileStatuses'] + new_ls2 = self.filesystem_client.ls(partition_path) + assert len(new_ls2) == 0 + assert new_ls != new_ls2 # Test for IMPALA-2008 insert overwrite to an empty table with empty dataset empty_target_tbl = "test_overwrite_with_empty_target" @@ -422,9 +420,10 @@ class TestInsertBehaviour(ImpalaTestSuite): # Delete target table directory, query should fail with # "No such file or directory" error target_table_path = "%s%s" % (db_path, empty_target_tbl) - self.hdfs_client.delete_file_dir(target_table_path, recursive=True) + self.filesystem_client.delete_file_dir(target_table_path, recursive=True) self.execute_query_expect_failure(self.client, insert_query) + @SkipIfS3.hdfs_acls @SkipIfIsilon.hdfs_acls def test_multiple_group_acls(self, unique_database): """Test that INSERT correctly respects multiple group ACLs""" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_insert_parquet.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py index 0fcf083..a3c5f35 100644 --- a/tests/query_test/test_insert_parquet.py +++ b/tests/query_test/test_insert_parquet.py @@ -18,7 +18,6 @@ PARQUET_CODECS = ['none', 'snappy'] # TODO: these tests take a while so we don't want to go through too many sizes but # we should in more exhaustive testing PARQUET_FILE_SIZES = [0, 32 * 1024 * 1024] [email protected] class TestInsertParquetQueries(ImpalaTestSuite): @classmethod def get_workload(self): @@ -57,7 +56,6 @@ class TestInsertParquetQueries(ImpalaTestSuite): vector.get_value('compression_codec') self.run_test_case('insert_parquet', vector, multiple_impalad=True) [email protected] class TestInsertParquetInvalidCodec(ImpalaTestSuite): @classmethod def get_workload(self): @@ -88,7 +86,6 @@ class TestInsertParquetInvalidCodec(ImpalaTestSuite): multiple_impalad=True) [email protected] class TestInsertParquetVerifySize(ImpalaTestSuite): @classmethod def get_workload(self): @@ -136,13 +133,10 @@ class TestInsertParquetVerifySize(ImpalaTestSuite): # Get the files in hdfs and verify. There can be at most 1 file that is smaller # that the BLOCK_SIZE. The rest should be within 80% of it and not over. found_small_file = False - ls = self.hdfs_client.list_dir(DIR) - for f in ls['FileStatuses']['FileStatus']: - if f['type'] != 'FILE': - continue - length = f['length'] - print length - assert length < BLOCK_SIZE - if length < BLOCK_SIZE * 0.80: + sizes = self.filesystem_client.get_all_file_sizes(DIR) + for size in sizes: + assert size < BLOCK_SIZE, "File size greater than expected.\ + Expected: {0}, Got: {1}".format(BLOCK_SIZE, size) + if size < BLOCK_SIZE * 0.80: assert found_small_file == False found_small_file = True http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_insert_permutation.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_insert_permutation.py b/tests/query_test/test_insert_permutation.py index dd34e09..cac7336 100644 --- a/tests/query_test/test_insert_permutation.py +++ b/tests/query_test/test_insert_permutation.py @@ -30,7 +30,6 @@ class TestInsertQueriesWithPermutation(ImpalaTestSuite): cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0])) cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload())) - @SkipIfS3.insert def test_insert_permutation(self, vector): map(self.cleanup_db, ["insert_permutation_test"]) self.run_test_case('QueryTest/insert_permutation', vector) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_join_queries.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_join_queries.py b/tests/query_test/test_join_queries.py index f617ffd..2729b31 100644 --- a/tests/query_test/test_join_queries.py +++ b/tests/query_test/test_join_queries.py @@ -96,7 +96,6 @@ class TestTPCHJoinQueries(ImpalaTestSuite): new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size') self.run_test_case('tpch-outer-joins', new_vector) [email protected] class TestSemiJoinQueries(ImpalaTestSuite): @classmethod def get_workload(cls): http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_multiple_filesystems.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_multiple_filesystems.py b/tests/query_test/test_multiple_filesystems.py index 1ea2b50..926c247 100644 --- a/tests/query_test/test_multiple_filesystems.py +++ b/tests/query_test/test_multiple_filesystems.py @@ -8,7 +8,6 @@ from tests.common.test_dimensions import create_single_exec_option_dimension from tests.common.skip import SkipIf, SkipIfIsilon, SkipIfS3 from tests.util.filesystem_utils import get_fs_path [email protected] @SkipIf.default_fs # Run only when a non-default filesystem is available. @SkipIfIsilon.untriaged # Missing coverage: Find out why this is failing. class TestMultipleFilesystems(ImpalaTestSuite): http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_partitioning.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_partitioning.py b/tests/query_test/test_partitioning.py index cb74bb0..7e74b60 100644 --- a/tests/query_test/test_partitioning.py +++ b/tests/query_test/test_partitioning.py @@ -45,7 +45,7 @@ class TestPartitioning(ImpalaTestSuite): def setup_class(cls): super(TestPartitioning, cls).setup_class() map(cls.cleanup_db, cls.TEST_DBS) - cls.hdfs_client.delete_file_dir("test-warehouse/all_insert_partition_col_types/",\ + cls.filesystem_client.delete_file_dir("test-warehouse/all_insert_partition_col_types/",\ recursive=True) @classmethod @@ -53,7 +53,6 @@ class TestPartitioning(ImpalaTestSuite): map(cls.cleanup_db, cls.TEST_DBS) super(TestPartitioning, cls).teardown_class() - @SkipIfS3.insert @SkipIfLocal.root_path @pytest.mark.execute_serially def test_partition_col_types(self, vector): @@ -67,7 +66,6 @@ class TestPartitioning(ImpalaTestSuite): @SkipIfIsilon.hive @SkipIfLocal.hive @pytest.mark.execute_serially - @SkipIfS3.insert def test_boolean_partitions(self, vector): # This test takes about a minute to complete due to the Hive commands that are # executed. To cut down on runtime, limit the test to exhaustive exploration http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_queries.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py index 016c730..71fd21c 100644 --- a/tests/query_test/test_queries.py +++ b/tests/query_test/test_queries.py @@ -146,7 +146,6 @@ class TestQueriesTextTables(ImpalaTestSuite): def test_mixed_format(self, vector): self.run_test_case('QueryTest/mixed-format', vector) - @SkipIfS3.insert def test_values(self, vector): self.run_test_case('QueryTest/values', vector) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_scanners.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index e64bf0c..15181fc 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -281,7 +281,6 @@ class TestParquet(ImpalaTestSuite): for scan_ranges_complete in scan_ranges_complete_list: assert int(scan_ranges_complete) == ranges_per_node - @SkipIfS3.insert def test_annotate_utf8_option(self, vector, unique_database): if self.exploration_strategy() != 'exhaustive': pytest.skip("Only run in exhaustive") @@ -338,7 +337,6 @@ class TestParquet(ImpalaTestSuite): assert c_schema_elt.converted_type == ConvertedType.UTF8 assert d_schema_elt.converted_type == None - @SkipIfS3.insert @SkipIfOldAggsJoins.nested_types def test_resolution_by_name(self, unique_database, vector): self.run_test_case('QueryTest/parquet-resolution-by-name', vector, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/shell/test_shell_commandline.py ---------------------------------------------------------------------- diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py index d88bf11..e06f7eb 100644 --- a/tests/shell/test_shell_commandline.py +++ b/tests/shell/test_shell_commandline.py @@ -35,7 +35,6 @@ TEST_DB = "tmp_shell" TEST_TBL = "tbl1" QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell') [email protected] class TestImpalaShell(object): """A set of sanity tests for the Impala shell commandline parameters. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/util/filesystem_base.py ---------------------------------------------------------------------- diff --git a/tests/util/filesystem_base.py b/tests/util/filesystem_base.py new file mode 100644 index 0000000..da9458b --- /dev/null +++ b/tests/util/filesystem_base.py @@ -0,0 +1,61 @@ +# Copyright (c) 2016 Cloudera, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Filsystem access abstraction + +from abc import ABCMeta, abstractmethod + +class BaseFilesystem(object): + __metaclass__ = ABCMeta + + @abstractmethod + def create_file(self, path, file_data, overwrite): + """Create a file in 'path' and populate with the string 'file_data'. If overwrite is + True, the file is overwritten. Returns True if successful, False if the file already + exists and throws an exception otherwise""" + pass + + @abstractmethod + def make_dir(self, path, permission): + """Create a directory in 'path' with octal umask 'permission'. + Returns True if successful and throws an exception otherwise""" + pass + + @abstractmethod + def copy(self, src, dst): + """Copy a file from 'src' to 'dst'. Throws an exception if unsuccessful.""" + pass + + @abstractmethod + def ls(self, path): + """Return a list of all files/dirs/keys in path. Throws an exception if path + is invalid.""" + pass + + @abstractmethod + def exists(self, path): + """Returns True if a particular path exists, else it returns False.""" + pass + + @abstractmethod + def delete_file_dir(self, path, recursive): + """Delete all files/dirs/keys in a path. Returns True if successful or if the file + does not exist. Throws an exception otherwise.""" + pass + + @abstractmethod + def get_all_file_sizes(self, path): + """Returns a list of integers which are all the file sizes of files found under + 'path'.""" + pass http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/util/filesystem_utils.py ---------------------------------------------------------------------- diff --git a/tests/util/filesystem_utils.py b/tests/util/filesystem_utils.py index 0f60a37..d22ae00 100644 --- a/tests/util/filesystem_utils.py +++ b/tests/util/filesystem_utils.py @@ -29,6 +29,9 @@ IS_DEFAULT_FS = not FILESYSTEM_PREFIX or IS_LOCAL # Isilon specific values. ISILON_WEBHDFS_PORT = 8082 +# S3 specific values +S3_BUCKET_NAME = os.getenv("S3_BUCKET") + def get_fs_path(path): return "%s%s" % (FILESYSTEM_PREFIX, path) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/util/hdfs_util.py ---------------------------------------------------------------------- diff --git a/tests/util/hdfs_util.py b/tests/util/hdfs_util.py index 7ca83aa..7870666 100644 --- a/tests/util/hdfs_util.py +++ b/tests/util/hdfs_util.py @@ -18,12 +18,13 @@ from os import environ from os.path import join as join_path from pywebhdfs.webhdfs import PyWebHdfsClient, errors, _raise_pywebhdfs_exception from xml.etree.ElementTree import parse +from tests.util.filesystem_base import BaseFilesystem import getpass import httplib import requests import types -class PyWebHdfsClientWithChmod(PyWebHdfsClient): +class PyWebHdfsClientWithChmod(PyWebHdfsClient, BaseFilesystem): def chmod(self, path, permission): """Set the permission of 'path' to 'permission' (specified as an octal string, e.g. '775'""" @@ -61,9 +62,7 @@ class PyWebHdfsClientWithChmod(PyWebHdfsClient): Overrides the superclass's method by providing delete if exists semantics. This takes the burden of stat'ing the file away from the caller. """ - try: - self.get_file_dir_status(path) - except Exception as e: + if not self.exists(path): return True return super(PyWebHdfsClientWithChmod, self).delete_file_dir(path, recursive=recursive) @@ -77,6 +76,14 @@ class PyWebHdfsClientWithChmod(PyWebHdfsClient): path = path.lstrip('/') return super(PyWebHdfsClientWithChmod, self).get_file_dir_status(path) + def get_all_file_sizes(self, path): + """Returns a list of all file sizes in the path""" + sizes = [] + for status in self.list_dir(path).get('FileStatuses').get('FileStatus'): + if status['type'] == 'FILE': + sizes += [status['length']] + return sizes + def copy(self, src, dest): """Copies a file in hdfs from src to destination @@ -92,8 +99,21 @@ class PyWebHdfsClientWithChmod(PyWebHdfsClient): self.create_file(dest, data) assert self.get_file_dir_status(dest) assert self.read_file(dest) == data - return True + def ls(self, path): + """Returns a list of all file and directory names in 'path'""" + # list_dir() returns a dictionary of file statues. This function picks out the + # file and directory names and just returns a list of the names. + file_infos = self.list_dir(path).get('FileStatuses').get('FileStatus') + return [info.get('pathSuffix') for info in file_infos] + + def exists(self, path): + """Checks if a particular path exists""" + try: + self.get_file_dir_status(path) + except errors.FileNotFound: + return False + return True class HdfsConfig(object): """Reads an XML configuration file (produced by a mini-cluster) into a dictionary @@ -116,23 +136,9 @@ def get_hdfs_client_from_conf(conf): host, port = hostport.split(":") return get_hdfs_client(host=host, port=port) -def _pyweb_hdfs_client_exists(self, path): - """The PyWebHdfsClient doesn't provide an API to cleanly detect if a file or directory - exists. This method is bound to each client that is created so tests can simply call - hdfs_client.exists('path') and get back a bool. - """ - try: - self.get_file_dir_status(path) - except errors.FileNotFound: - return False - return True - def get_hdfs_client(host, port, user_name=getpass.getuser()): """Returns a new HTTP client for an HDFS cluster using an explict host:port pair""" - hdfs_client = PyWebHdfsClientWithChmod(host=host, port=port, user_name=user_name) - # Bind our "exists" method to hdfs_client.exists - hdfs_client.exists = types.MethodType(_pyweb_hdfs_client_exists, hdfs_client) - return hdfs_client + return PyWebHdfsClientWithChmod(host=host, port=port, user_name=user_name) def get_default_hdfs_config(): core_site_path = join_path(environ.get('HADOOP_CONF_DIR'), 'core-site.xml') http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/util/s3_util.py ---------------------------------------------------------------------- diff --git a/tests/util/s3_util.py b/tests/util/s3_util.py new file mode 100644 index 0000000..42e44e8 --- /dev/null +++ b/tests/util/s3_util.py @@ -0,0 +1,99 @@ +# Copyright (c) 2016 Cloudera, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# S3 access utilities +# +# This file uses the boto3 client and provides simple functions to the Impala test suite +# to access Amazon S3. + +import boto3 +import copy +from tests.util.filesystem_base import BaseFilesystem + +class S3Client(BaseFilesystem): + + @classmethod + def __init__(self, bucket): + self.bucketname = bucket + self.s3 = boto3.resource('s3') + self.bucket = self.s3.Bucket(self.bucketname) + self.s3client = boto3.client('s3') + + def create_file(self, path, file_data, overwrite=True): + if not overwrite and self.exists(filename): return False + self.s3client.put_object(Bucket=self.bucketname, Key=path, Body=file_data) + return True + + def make_dir(self, path, permission=None): + # This function is a no-op. S3 is a key-value store and does not have a directory + # structure. We can use a non existant path as though it already exists. + pass + + def copy(self, src, dst): + self.s3client.copy_object(Bucket=self.bucketname, + CopySource={'Bucket':self.bucketname, 'Key':src}, Key=dst) + + # Since S3 is a key-value store, it does not have a command like 'ls' for a directory + # structured filesystem. It lists everything under a path recursively. + # We have to manipulate its response to get an 'ls' like output. + def ls(self, path): + if not path.endswith('/'): + path += '/' + # Use '/' as a delimiter so that we don't get all keys under a path recursively. + response = self.s3client.list_objects( + Bucket=self.bucketname, Prefix=path, Delimiter='/') + dirs = [] + # Non-keys or "directories" will be listed as 'Prefix' under 'CommonPrefixes'. + if 'CommonPrefixes' in response: + dirs = [t['Prefix'] for t in response['CommonPrefixes']] + files = [] + # Keys or "files" will be listed as 'Key' under 'Contents'. + if 'Contents' in response: + files = [t['Key'] for t in response['Contents']] + files_and_dirs = [] + files_and_dirs.extend([d.split('/')[-2] for d in dirs]) + for f in files: + key = f.split("/")[-1] + if not key == '': + files_and_dirs += [key] + return files_and_dirs + + def get_all_file_sizes(self, path): + if not path.endswith('/'): + path += '/' + # Use '/' as a delimiter so that we don't get all keys under a path recursively. + response = self.s3client.list_objects( + Bucket=self.bucketname, Prefix=path, Delimiter='/') + if 'Contents' in response: + return [t['Size'] for t in response['Contents']] + return [] + + def exists(self, path): + response = self.s3client.list_objects(Bucket=self.bucketname,Prefix=path) + return response.get('Contents') is not None + + # Helper function which lists keys in a path. Should not be used by the tests directly. + def _list_keys(self, path): + if not self.exists(path): + return False + response = self.s3client.list_objects(Bucket=self.bucketname, Prefix=path) + contents = response.get('Contents') + return [c['Key'] for c in contents] + + def delete_file_dir(self, path, recursive=False): + if not self.exists(path): + return True + objects = [{'Key': k} for k in self._list_keys(path)] if recursive else path + self.s3client.delete_objects(Bucket=self.bucketname, Delete={'Objects':objects}) + return True
