Repository: incubator-impala
Updated Branches:
  refs/heads/master f915d59aa -> 14cdb0497


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_ddl.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index 9a38205..d40f77c 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -28,9 +28,9 @@ from tests.util.filesystem_utils import WAREHOUSE, IS_LOCAL
 
 # Validates DDL statements (create, drop)
 class TestDdlStatements(ImpalaTestSuite):
-  TEST_DBS = ['ddl_test_db', 'ddl_purge_db', 'alter_table_test_db', 
'alter_table_test_db2',
-              'function_ddl_test', 'udf_test', 'data_src_test', 
'truncate_table_test_db',
-              'test_db', 'alter_purge_db', 'db_with_comment']
+  TEST_DBS = ['ddl_test_db', 'ddl_purge_db', 'alter_table_test_db',
+              'alter_table_test_db2', 'function_ddl_test', 'udf_test', 
'data_src_test',
+              'truncate_table_test_db', 'test_db', 'alter_purge_db', 
'db_with_comment']
 
   @classmethod
   def get_workload(self):
@@ -65,9 +65,9 @@ class TestDdlStatements(ImpalaTestSuite):
     # time a table is created with the same location. This also helps remove 
any stale
     # data from the last test run.
     for dir_ in ['part_data', 't1_tmp1', 't_part_tmp']:
-      self.hdfs_client.delete_file_dir('test-warehouse/%s' % dir_, 
recursive=True)
+      self.filesystem_client.delete_file_dir('test-warehouse/%s' % dir_, 
recursive=True)
 
-  @SkipIfS3.hdfs_client # S3: missing coverage: drop table/partition with PURGE
+  @SkipIfS3.hdfs_purge
   @SkipIfLocal.hdfs_client
   @pytest.mark.execute_serially
   def test_drop_table_with_purge(self):
@@ -80,31 +80,31 @@ class TestDdlStatements(ImpalaTestSuite):
     self.client.execute("create table {0}.t2(i int)".format(DDL_PURGE_DB))
     # Create sample test data files under the table directories
     
self.hdfs_client.create_file("test-warehouse/{0}.db/t1/t1.txt".format(DDL_PURGE_DB),\
-            file_data='t1')
+        file_data='t1')
     
self.hdfs_client.create_file("test-warehouse/{0}.db/t2/t2.txt".format(DDL_PURGE_DB),\
-            file_data='t2')
+        file_data='t2')
     # Drop the table (without purge) and make sure it exists in trash
     self.client.execute("drop table {0}.t1".format(DDL_PURGE_DB))
     assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/t1.txt".\
-            format(DDL_PURGE_DB))
+        format(DDL_PURGE_DB))
     assert not 
self.hdfs_client.exists("test-warehouse/{0}.db/t1/".format(DDL_PURGE_DB))
     assert self.hdfs_client.exists(\
-            "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/t1.txt".\
-            format(getpass.getuser(), DDL_PURGE_DB))
+        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/t1.txt".\
+        format(getpass.getuser(), DDL_PURGE_DB))
     assert self.hdfs_client.exists(\
-            "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1".\
-            format(getpass.getuser(), DDL_PURGE_DB))
+        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1".\
+        format(getpass.getuser(), DDL_PURGE_DB))
     # Drop the table (with purge) and make sure it doesn't exist in trash
     self.client.execute("drop table {0}.t2 purge".format(DDL_PURGE_DB))
     assert not 
self.hdfs_client.exists("test-warehouse/{0}.db/t2/".format(DDL_PURGE_DB))
     assert not self.hdfs_client.exists("test-warehouse/{0}.db/t2/t2.txt".\
-            format(DDL_PURGE_DB))
+        format(DDL_PURGE_DB))
     assert not self.hdfs_client.exists(\
-            "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t2/t2.txt".\
-            format(getpass.getuser(), DDL_PURGE_DB))
+        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t2/t2.txt".\
+        format(getpass.getuser(), DDL_PURGE_DB))
     assert not self.hdfs_client.exists(\
-            "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t2".\
-            format(getpass.getuser(), DDL_PURGE_DB))
+        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t2".\
+        format(getpass.getuser(), DDL_PURGE_DB))
     # Create an external table t3 and run the same test as above. Make
     # sure the data is not deleted
     self.hdfs_client.make_dir("test-warehouse/data_t3/", permission=777)
@@ -115,101 +115,100 @@ class TestDdlStatements(ImpalaTestSuite):
     assert self.hdfs_client.exists("test-warehouse/data_t3/data.txt")
     self.hdfs_client.delete_file_dir("test-warehouse/data_t3", recursive=True)
 
-  @SkipIfS3.hdfs_client # S3: missing coverage: drop table/database
   @SkipIfLocal.hdfs_client
   @pytest.mark.execute_serially
   def test_drop_cleans_hdfs_dirs(self):
     DDL_TEST_DB = "ddl_test_db"
-    self.hdfs_client.delete_file_dir("test-warehouse/ddl_test_db.db/", 
recursive=True)
-    assert not self.hdfs_client.exists("test-warehouse/ddl_test_db.db/")
+    self.filesystem_client.delete_file_dir("test-warehouse/ddl_test_db.db/",
+                                           recursive=True)
+    assert not self.filesystem_client.exists("test-warehouse/ddl_test_db.db/")
 
     self.client.execute('use default')
     self._create_db(DDL_TEST_DB)
     # Verify the db directory exists
-    assert 
self.hdfs_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB))
+    assert 
self.filesystem_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB))
 
     self.client.execute("create table {0}.t1(i int)".format(DDL_TEST_DB))
     # Verify the table directory exists
-    assert 
self.hdfs_client.exists("test-warehouse/{0}.db/t1/".format(DDL_TEST_DB))
+    assert 
self.filesystem_client.exists("test-warehouse/{0}.db/t1/".format(DDL_TEST_DB))
 
     # Dropping the table removes the table's directory and preserves the db's 
directory
     self.client.execute("drop table {0}.t1".format(DDL_TEST_DB))
-    assert not 
self.hdfs_client.exists("test-warehouse/{0}.db/t1/".format(DDL_TEST_DB))
-    assert 
self.hdfs_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB))
+    assert not self.filesystem_client.exists(
+        "test-warehouse/{0}.db/t1/".format(DDL_TEST_DB))
+    assert 
self.filesystem_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB))
 
     # Dropping the db removes the db's directory
     self.client.execute("drop database {0}".format(DDL_TEST_DB))
-    assert not 
self.hdfs_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB))
+    assert not 
self.filesystem_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB))
 
     # Dropping the db using "cascade" removes all tables' and db's directories
     # but keeps the external tables' directory
     self._create_db(DDL_TEST_DB)
     self.client.execute("create table {0}.t1(i int)".format(DDL_TEST_DB))
     self.client.execute("create table {0}.t2(i int)".format(DDL_TEST_DB))
-    self.client.execute("create external table {0}.t3(i int) "
-                        "location 
'/test-warehouse/{0}/t3/'".format(DDL_TEST_DB))
+    result = self.client.execute("create external table {0}.t3(i int) "
+        "location '{1}/{0}/t3/'".format(DDL_TEST_DB, WAREHOUSE))
     self.client.execute("drop database {0} cascade".format(DDL_TEST_DB))
-    assert not 
self.hdfs_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB))
-    assert not 
self.hdfs_client.exists("test-warehouse/{0}.db/t1/".format(DDL_TEST_DB))
-    assert not 
self.hdfs_client.exists("test-warehouse/{0}.db/t2/".format(DDL_TEST_DB))
-    assert 
self.hdfs_client.exists("test-warehouse/{0}/t3/".format(DDL_TEST_DB))
-    
self.hdfs_client.delete_file_dir("test-warehouse/{0}/t3/".format(DDL_TEST_DB),
+    assert not 
self.filesystem_client.exists("test-warehouse/{0}.db/".format(DDL_TEST_DB))
+    assert not self.filesystem_client.exists(
+        "test-warehouse/{0}.db/t1/".format(DDL_TEST_DB))
+    assert not self.filesystem_client.exists(
+        "test-warehouse/{0}.db/t2/".format(DDL_TEST_DB))
+    assert 
self.filesystem_client.exists("test-warehouse/{0}/t3/".format(DDL_TEST_DB))
+    
self.filesystem_client.delete_file_dir("test-warehouse/{0}/t3/".format(DDL_TEST_DB),
         recursive=True)
-    assert not 
self.hdfs_client.exists("test-warehouse/{0}/t3/".format(DDL_TEST_DB))
+    assert not 
self.filesystem_client.exists("test-warehouse/{0}/t3/".format(DDL_TEST_DB))
 
-  @SkipIfS3.insert # S3: missing coverage: truncate table
   @SkipIfLocal.hdfs_client
   @pytest.mark.execute_serially
   def test_truncate_cleans_hdfs_files(self):
     TRUNCATE_DB = "truncate_table_test_db"
-    self.hdfs_client.delete_file_dir("test-warehouse/%s.db/" % TRUNCATE_DB,
+    self.filesystem_client.delete_file_dir("test-warehouse/%s.db/" % 
TRUNCATE_DB,
         recursive=True)
-    assert not self.hdfs_client.exists("test-warehouse/%s.db/" % TRUNCATE_DB)
+    assert not self.filesystem_client.exists("test-warehouse/%s.db/" % 
TRUNCATE_DB)
 
     self._create_db(TRUNCATE_DB, sync=True)
     # Verify the db directory exists
-    assert self.hdfs_client.exists("test-warehouse/%s.db/" % TRUNCATE_DB)
+    assert self.filesystem_client.exists("test-warehouse/%s.db/" % TRUNCATE_DB)
 
     self.client.execute("create table %s.t1(i int)" % TRUNCATE_DB)
     # Verify the table directory exists
-    assert 
self.hdfs_client.exists("test-warehouse/truncate_table_test_db.db/t1/")
+    assert 
self.filesystem_client.exists("test-warehouse/truncate_table_test_db.db/t1/")
 
     # Should have created one file in the table's dir
     self.client.execute("insert into %s.t1 values (1)" % TRUNCATE_DB)
-    ls = self.hdfs_client.list_dir("test-warehouse/%s.db/t1/" % TRUNCATE_DB)
-    assert len(ls['FileStatuses']['FileStatus']) == 2
+    assert len(self.filesystem_client.ls("test-warehouse/%s.db/t1/" % 
TRUNCATE_DB)) == 2
 
     # Truncating the table removes the data files and preserves the table's 
directory
     self.client.execute("truncate table %s.t1" % TRUNCATE_DB)
-    ls = self.hdfs_client.list_dir("test-warehouse/%s.db/t1/" % TRUNCATE_DB)
-    assert len(ls['FileStatuses']['FileStatus']) == 1
+    assert len(self.filesystem_client.ls("test-warehouse/%s.db/t1/" % 
TRUNCATE_DB)) == 1
 
     self.client.execute(
         "create table %s.t2(i int) partitioned by (p int)" % TRUNCATE_DB)
     # Verify the table directory exists
-    assert self.hdfs_client.exists("test-warehouse/%s.db/t2/" % TRUNCATE_DB)
+    assert self.filesystem_client.exists("test-warehouse/%s.db/t2/" % 
TRUNCATE_DB)
 
     # Should have created the partition dir, which should contain exactly one 
file
     self.client.execute(
         "insert into %s.t2 partition(p=1) values (1)" % TRUNCATE_DB)
-    ls = self.hdfs_client.list_dir("test-warehouse/%s.db/t2/p=1" % TRUNCATE_DB)
-    assert len(ls['FileStatuses']['FileStatus']) == 1
+    assert len(self.filesystem_client.ls(
+        "test-warehouse/%s.db/t2/p=1" % TRUNCATE_DB)) == 1
 
     # Truncating the table removes the data files and preserves the 
partition's directory
     self.client.execute("truncate table %s.t2" % TRUNCATE_DB)
-    assert self.hdfs_client.exists("test-warehouse/%s.db/t2/p=1" % TRUNCATE_DB)
-    ls = self.hdfs_client.list_dir("test-warehouse/%s.db/t2/p=1" % TRUNCATE_DB)
-    assert len(ls['FileStatuses']['FileStatus']) == 0
+    assert self.filesystem_client.exists("test-warehouse/%s.db/t2/p=1" % 
TRUNCATE_DB)
+    assert len(self.filesystem_client.ls(
+        "test-warehouse/%s.db/t2/p=1" % TRUNCATE_DB)) == 0
 
-  @SkipIfS3.insert # S3: missing coverage: truncate table
   @pytest.mark.execute_serially
   def test_truncate_table(self, vector):
     vector.get_value('exec_option')['abort_on_error'] = False
     self._create_db('truncate_table_test_db', sync=True)
-    self.run_test_case('QueryTest/truncate-table', vector, 
use_db='truncate_table_test_db',
+    self.run_test_case('QueryTest/truncate-table', vector,
+        use_db='truncate_table_test_db',
         multiple_impalad=self._use_multiple_impalad(vector))
 
-  @SkipIfS3.insert
   @pytest.mark.execute_serially
   def test_create_table(self, vector):
     vector.get_value('exec_option')['abort_on_error'] = False
@@ -270,7 +269,6 @@ class TestDdlStatements(ImpalaTestSuite):
     assert 'test_tbl' in self.client.execute("show tables in test_db").data
 
   @SkipIf.kudu_not_supported
-  @SkipIfS3.insert
   @pytest.mark.execute_serially
   def test_create_kudu(self, vector):
     self.expected_exceptions = 2
@@ -291,15 +289,15 @@ class TestDdlStatements(ImpalaTestSuite):
     assert 'ddl_test_db' not in self.all_db_names()
 
   # TODO: don't use hdfs_client
-  @SkipIfS3.insert # S3: missing coverage: alter table
   @SkipIfLocal.hdfs_client
   @pytest.mark.execute_serially
   def test_alter_table(self, vector):
     vector.get_value('exec_option')['abort_on_error'] = False
     # Create directory for partition data that does not use the (key=value)
     # format.
-    self.hdfs_client.make_dir("test-warehouse/part_data/", permission=777)
-    self.hdfs_client.create_file("test-warehouse/part_data/data.txt", 
file_data='1984')
+    self.filesystem_client.make_dir("test-warehouse/part_data/", 
permission=777)
+    self.filesystem_client.create_file(
+        "test-warehouse/part_data/data.txt", file_data='1984')
 
     # Create test databases
     self._create_db('alter_table_test_db', sync=True)
@@ -307,7 +305,7 @@ class TestDdlStatements(ImpalaTestSuite):
     self.run_test_case('QueryTest/alter-table', vector, 
use_db='alter_table_test_db',
         multiple_impalad=self._use_multiple_impalad(vector))
 
-  @SkipIfS3.hdfs_client # S3: missing coverage: alter table drop partition
+  @SkipIfS3.hdfs_purge # S3: missing coverage: alter table drop partition
   @SkipIfLocal.hdfs_client
   @pytest.mark.execute_serially
   def test_drop_partition_with_purge(self, vector):
@@ -327,28 +325,28 @@ class TestDdlStatements(ImpalaTestSuite):
     # Drop the partition (j=1) without purge and make sure it exists in trash
     self.client.execute("alter table {0}.t1 drop 
partition(j=1)".format(ALTER_PURGE_DB));
     assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=1/j1.txt".\
-            format(ALTER_PURGE_DB))
+        format(ALTER_PURGE_DB))
     assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=1".\
-            format(ALTER_PURGE_DB))
+        format(ALTER_PURGE_DB))
     assert self.hdfs_client.exists(\
-            "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1/j1.txt".\
-            format(getpass.getuser(), ALTER_PURGE_DB))
+        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1/j1.txt".\
+        format(getpass.getuser(), ALTER_PURGE_DB))
     assert self.hdfs_client.exists(\
-            "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1".\
-            format(getpass.getuser(), ALTER_PURGE_DB))
+        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1".\
+        format(getpass.getuser(), ALTER_PURGE_DB))
     # Drop the partition (with purge) and make sure it doesn't exist in trash
     self.client.execute("alter table {0}.t1 drop partition(j=2) purge".\
-            format(ALTER_PURGE_DB));
+        format(ALTER_PURGE_DB));
     assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=2/j2.txt".\
-            format(ALTER_PURGE_DB))
+        format(ALTER_PURGE_DB))
     assert not self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=2".\
-            format(ALTER_PURGE_DB))
+        format(ALTER_PURGE_DB))
     assert not self.hdfs_client.exists(\
-            "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2".\
-            format(getpass.getuser(), ALTER_PURGE_DB))
+        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2".\
+        format(getpass.getuser(), ALTER_PURGE_DB))
     assert not self.hdfs_client.exists(
-            "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2/j2.txt".\
-            format(getpass.getuser(), ALTER_PURGE_DB))
+        "/user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2/j2.txt".\
+        format(getpass.getuser(), ALTER_PURGE_DB))
 
   @pytest.mark.execute_serially
   def test_views_ddl(self, vector):
@@ -434,11 +432,11 @@ class TestDdlStatements(ImpalaTestSuite):
 
   def create_drop_ddl(self, vector, db_name, create_stmts, drop_stmts, 
select_stmt,
       num_iterations=3):
-    """ Helper method to run CREATE/DROP DDL commands repeatedly and exercise 
the lib cache
-    create_stmts is the list of CREATE statements to be executed in order 
drop_stmts is
-    the list of DROP statements to be executed in order. Each statement should 
have a
-    '%s' placeholder to insert "IF EXISTS" or "". The select_stmt is just a 
single
-    statement to test after executing the CREATE statements.
+    """ Helper method to run CREATE/DROP DDL commands repeatedly and exercise 
the lib
+    cache create_stmts is the list of CREATE statements to be executed in order
+    drop_stmts is the list of DROP statements to be executed in order. Each 
statement
+    should have a '%s' placeholder to insert "IF EXISTS" or "". The 
select_stmt is just a
+    single statement to test after executing the CREATE statements.
     TODO: it's hard to tell that the cache is working (i.e. if it did nothing 
to drop
     the cache, these tests would still pass). Testing that is a bit harder and 
requires
     us to update the udf binary in the middle.
@@ -454,7 +452,6 @@ class TestDdlStatements(ImpalaTestSuite):
       self.client.execute(select_stmt)
       for drop_stmt in drop_stmts: self.client.execute(drop_stmt % (""))
 
-  @SkipIfS3.insert # S3: missing coverage: alter table partitions.
   @pytest.mark.execute_serially
   def test_create_alter_bulk_partition(self, vector):
     TBL_NAME = 'foo_part'
@@ -478,8 +475,8 @@ class TestDdlStatements(ImpalaTestSuite):
         " set fileformat parquetfile" % TBL_NAME)
 
     # Alter one partition to a non-existent location twice (IMPALA-741)
-    self.hdfs_client.delete_file_dir("tmp/dont_exist1/", recursive=True)
-    self.hdfs_client.delete_file_dir("tmp/dont_exist2/", recursive=True)
+    self.filesystem_client.delete_file_dir("tmp/dont_exist1/", recursive=True)
+    self.filesystem_client.delete_file_dir("tmp/dont_exist2/", recursive=True)
 
     self.execute_query_expect_success(self.client,
         "alter table {0} partition(j=1,s='1') set location 
'{1}/tmp/dont_exist1'"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_explain.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_explain.py b/tests/metadata/test_explain.py
index 1bc6011..0719a21 100644
--- a/tests/metadata/test_explain.py
+++ b/tests/metadata/test_explain.py
@@ -97,33 +97,33 @@ class TestExplainEmptyPartition(ImpalaTestSuite):
   def teardown_method(self, method):
     self.cleanup_db(self.TEST_DB_NAME)
 
-  @SkipIfS3.hdfs_client
   @SkipIfLocal.hdfs_client
   def test_non_empty_partition_0_rows(self):
     """Regression test for IMPALA-1708: if a partition has 0 rows but > 0 
files after
     COMPUTE STATS, don't warn the user about missing stats. The files are 
probably
     corrupted, or used for something else."""
     self.client.execute("SET EXPLAIN_LEVEL=3")
-    self.client.execute(
-      "CREATE TABLE %s.empty_partition (col int) partitioned by (p int)" % 
self.TEST_DB_NAME);
+    self.client.execute("CREATE TABLE %s.empty_partition (col int) "
+                        "partitioned by (p int)" % self.TEST_DB_NAME)
     self.client.execute(
       "ALTER TABLE %s.empty_partition ADD PARTITION (p=NULL)" % 
self.TEST_DB_NAME)
     # Put an empty file in the partition so we have > 0 files, but 0 rows
-    self.hdfs_client.create_file(
-      
"test-warehouse/%s.db/empty_partition/p=__HIVE_DEFAULT_PARTITION__/empty" %
-      self.TEST_DB_NAME, "")
+    self.filesystem_client.create_file(
+        
"test-warehouse/%s.db/empty_partition/p=__HIVE_DEFAULT_PARTITION__/empty" %
+        self.TEST_DB_NAME, "")
     self.client.execute("REFRESH %s.empty_partition" % self.TEST_DB_NAME)
     self.client.execute("COMPUTE STATS %s.empty_partition" % self.TEST_DB_NAME)
     assert "NULL\t0\t1" in str(
-      self.client.execute("SHOW PARTITIONS %s.empty_partition" % 
self.TEST_DB_NAME))
+        self.client.execute("SHOW PARTITIONS %s.empty_partition" % 
self.TEST_DB_NAME))
     assert "missing relevant table and/or column statistics" not in str(
-      self.client.execute("EXPLAIN SELECT * FROM %s.empty_partition" % 
self.TEST_DB_NAME))
+        self.client.execute(
+            "EXPLAIN SELECT * FROM %s.empty_partition" % self.TEST_DB_NAME))
 
     # Now add a partition with some data (so it gets selected into the scan), 
to check
     # that its lack of stats is correctly identified
     self.client.execute(
       "ALTER TABLE %s.empty_partition ADD PARTITION (p=1)" % self.TEST_DB_NAME)
-    
self.hdfs_client.create_file("test-warehouse/%s.db/empty_partition/p=1/rows" %
+    
self.filesystem_client.create_file("test-warehouse/%s.db/empty_partition/p=1/rows"
 %
                                  self.TEST_DB_NAME, "1")
     self.client.execute("REFRESH %s.empty_partition" % self.TEST_DB_NAME)
     explain_result = str(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_hdfs_encryption.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_hdfs_encryption.py 
b/tests/metadata/test_hdfs_encryption.py
index 049734d..037b4a0 100644
--- a/tests/metadata/test_hdfs_encryption.py
+++ b/tests/metadata/test_hdfs_encryption.py
@@ -28,7 +28,7 @@ PYWEBHDFS_TMP_DIR = 'tmp/test_encryption_load_data'
 TMP_DIR = '/%s' % (PYWEBHDFS_TMP_DIR)
 
 
[email protected]_data
[email protected]_encryption
 @SkipIfIsilon.hdfs_encryption
 @SkipIfLocal.hdfs_encryption
 @pytest.mark.execute_serially
@@ -136,7 +136,6 @@ class TestHdfsEncryption(ImpalaTestSuite):
     else:
       self.client.execute('load data inpath \'%s\' into table tbl ' % 
(TMP_DIR))
 
-  @SkipIfS3.hdfs_client
   @SkipIfIsilon.hdfs_encryption
   @pytest.mark.execute_serially
   def test_drop_partition_encrypt(self):
@@ -187,7 +186,6 @@ class TestHdfsEncryption(ImpalaTestSuite):
     assert not 
self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=3/j3.txt".format(TEST_DB))
     assert not 
self.hdfs_client.exists("test-warehouse/{0}.db/t1/j=3".format(TEST_DB))
 
-  @SkipIfS3.hdfs_client
   @SkipIfIsilon.hdfs_encryption
   @pytest.mark.execute_serially
   def test_drop_table_encrypt(self):

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_hdfs_permissions.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_hdfs_permissions.py 
b/tests/metadata/test_hdfs_permissions.py
index 4d72354..05a6ac4 100644
--- a/tests/metadata/test_hdfs_permissions.py
+++ b/tests/metadata/test_hdfs_permissions.py
@@ -25,7 +25,7 @@ TEST_TBL = 'read_only_tbl'
 TBL_LOC = '%s/%s' % (WAREHOUSE, TEST_TBL)
 
 
[email protected]
[email protected]_acls
 @SkipIfLocal.hdfs_client
 class TestHdfsPermissions(ImpalaTestSuite):
   @classmethod

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_last_ddl_time_update.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_last_ddl_time_update.py 
b/tests/metadata/test_last_ddl_time_update.py
index dfb89f8..cbcb105 100644
--- a/tests/metadata/test_last_ddl_time_update.py
+++ b/tests/metadata/test_last_ddl_time_update.py
@@ -72,7 +72,6 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
     self.run_test("alter table %s set fileformat textfile" % FULL_NAME, True)
 
   @pytest.mark.execute_serially
-  @SkipIfS3.insert
   def test_insert(self, vector):
     # static partition insert
     self.run_test("insert into %s partition(j=1, s='2012') "

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_load.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_load.py b/tests/metadata/test_load.py
index bdf2db0..47703ec 100644
--- a/tests/metadata/test_load.py
+++ b/tests/metadata/test_load.py
@@ -7,7 +7,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import (
     create_single_exec_option_dimension,
     create_uncompressed_text_dimension)
-from tests.common.skip import SkipIfS3, SkipIfLocal
+from tests.common.skip import SkipIfLocal
 from tests.util.filesystem_utils import WAREHOUSE
 
 TEST_TBL_PART = "test_load"
@@ -18,7 +18,6 @@ MULTIAGG_PATH = 
'test-warehouse/alltypesaggmultifiles/year=2010/month=1/day=1'
 HIDDEN_FILES = ["{0}/3/.100101.txt".format(STAGING_PATH),
                 "{0}/3/_100101.txt".format(STAGING_PATH)]
 
[email protected]_data
 @SkipIfLocal.hdfs_client
 class TestLoadData(ImpalaTestSuite):
 
@@ -35,7 +34,7 @@ class TestLoadData(ImpalaTestSuite):
   def _clean_test_tables(self):
     self.client.execute("drop table if exists 
functional.{0}".format(TEST_TBL_NOPART))
     self.client.execute("drop table if exists 
functional.{0}".format(TEST_TBL_PART))
-    self.hdfs_client.delete_file_dir(STAGING_PATH, recursive=True)
+    self.filesystem_client.delete_file_dir(STAGING_PATH, recursive=True)
 
   def teardown_method(self, method):
     self._clean_test_tables()
@@ -47,34 +46,32 @@ class TestLoadData(ImpalaTestSuite):
     # Create staging directories for load data inpath. The staging directory 
is laid out
     # as follows:
     #   - It has 6 sub directories, numbered 1-6
-    #   - The directories are populated with files from a subset of partitions 
in existing
-    #     partitioned tables.
+    #   - The directories are populated with files from a subset of partitions 
in
+    #     existing partitioned tables.
     #   - Sub Directories 1-4 have single files copied from alltypes/
     #   - Sub Directories 5-6 have multiple files (4) copied from 
alltypesaggmultifiles
     #   - Sub Directory 3 also has hidden files, in both supported formats.
     #   - All sub-dirs contain a hidden directory
     for i in xrange(1, 6):
       stagingDir = '{0}/{1}'.format(STAGING_PATH, i)
-      self.hdfs_client.make_dir(stagingDir, permission=777)
-      self.hdfs_client.make_dir('{0}/_hidden_dir'.format(stagingDir), 
permission=777)
-
+      self.filesystem_client.make_dir(stagingDir, permission=777)
+      self.filesystem_client.make_dir('{0}/_hidden_dir'.format(stagingDir),
+                                      permission=777)
     # Copy single file partitions from alltypes.
     for i in xrange(1, 4):
-      self.hdfs_client.copy(ALLTYPES_PATH, 
"{0}/{1}/100101.txt".format(STAGING_PATH, i))
-
+      self.filesystem_client.copy(ALLTYPES_PATH,
+                               "{0}/{1}/100101.txt".format(STAGING_PATH, i))
     # Copy multi file partitions from alltypesaggmultifiles.
-    file_infos = self.hdfs_client.list_dir(
-        MULTIAGG_PATH).get('FileStatuses').get('FileStatus')
-    file_names = [info.get('pathSuffix') for info in file_infos]
+    file_names = self.filesystem_client.ls(MULTIAGG_PATH)
     for i in xrange(4, 6):
       for file_ in file_names:
-        self.hdfs_client.copy(
+        self.filesystem_client.copy(
             "{0}/{1}".format(MULTIAGG_PATH, file_),
             '{0}/{1}/{2}'.format(STAGING_PATH, i, file_))
 
     # Create two hidden files, with a leading . and _
     for file_ in HIDDEN_FILES:
-      self.hdfs_client.copy(ALLTYPES_PATH, file_)
+      self.filesystem_client.copy(ALLTYPES_PATH, file_)
 
     # Create both the test tables.
     self.client.execute("create table functional.{0} like functional.alltypes"
@@ -86,5 +83,4 @@ class TestLoadData(ImpalaTestSuite):
     self.run_test_case('QueryTest/load', vector)
     # The hidden files should not have been moved as part of the load 
operation.
     for file_ in HIDDEN_FILES:
-      assert self.hdfs_client.get_file_dir_status(file_), "{0} does not 
exist".format(
-          file_)
+      assert self.filesystem_client.exists(file_), "{0} does not 
exist".format(file_)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_partition_metadata.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_partition_metadata.py 
b/tests/metadata/test_partition_metadata.py
index 5c980bb..228bec4 100644
--- a/tests/metadata/test_partition_metadata.py
+++ b/tests/metadata/test_partition_metadata.py
@@ -56,7 +56,6 @@ class TestPartitionMetadata(ImpalaTestSuite):
   def teardown_method(self, method):
     self.cleanup_db(self.TEST_DB)
 
-  @SkipIfS3.insert # S3: missing coverage: partition DDL
   @SkipIfLocal.hdfs_client
   def test_multiple_partitions_same_location(self, vector):
     """Regression test for IMPALA-597. Verifies Impala is able to properly read
@@ -64,16 +63,16 @@ class TestPartitionMetadata(ImpalaTestSuite):
     """
     self.client.execute("use %s" % self.TEST_DB)
     impala_location = '%s/%s.db/%s' % (WAREHOUSE, self.TEST_DB, self.TEST_TBL)
-    hdfs_client_location = impala_location.split("/")[-1]
+    filesystem_client_location = impala_location.split("/")[-1]
     # Cleanup any existing data in the table directory.
-    self.hdfs_client.delete_file_dir(hdfs_client_location, recursive=True)
+    self.filesystem_client.delete_file_dir(filesystem_client_location, 
recursive=True)
     # Create the table
     self.client.execute("create table {0}(i int) partitioned by(j int)"
         "location '{1}/{2}.db/{0}'".format(self.TEST_TBL, WAREHOUSE, 
self.TEST_DB))
 
     # Point multiple partitions to the same location and use partition 
locations that
     # do not contain a key=value path.
-    self.hdfs_client.make_dir(hdfs_client_location + '/p')
+    self.filesystem_client.make_dir(filesystem_client_location + '/p')
 
     # Point both partitions to the same location.
     self.client.execute("alter table %s add partition (j=1) location '%s/p'" %

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_recover_partitions.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_recover_partitions.py 
b/tests/metadata/test_recover_partitions.py
index 9133b07..a219433 100644
--- a/tests/metadata/test_recover_partitions.py
+++ b/tests/metadata/test_recover_partitions.py
@@ -16,7 +16,7 @@
 import pytest
 from tests.common.test_dimensions import ALL_NODES_ONLY
 from tests.common.impala_test_suite import *
-from tests.common.skip import SkipIfS3, SkipIfLocal
+from tests.common.skip import SkipIfLocal
 from tests.util.filesystem_utils import WAREHOUSE, IS_DEFAULT_FS
 
 # Validates ALTER TABLE RECOVER PARTITIONS statement
@@ -59,7 +59,6 @@ class TestRecoverPartitions(ImpalaTestSuite):
   def teardown_method(self, method):
     self.cleanup_db(self.TEST_DB)
 
-  @SkipIfS3.insert
   @SkipIfLocal.hdfs_client
   @pytest.mark.execute_serially
   def test_recover_partitions(self, vector):
@@ -81,8 +80,9 @@ class TestRecoverPartitions(ImpalaTestSuite):
 
     # Create a path for a new partition using hdfs client and add a file with 
some values.
     # Test that the partition can be recovered and that the inserted data are 
accessible.
-    self.hdfs_client.make_dir(self.BASE_DIR + leaf_dir)
-    self.hdfs_client.create_file(self.BASE_DIR + leaf_dir + file_path, 
inserted_value)
+    self.filesystem_client.make_dir(self.BASE_DIR + leaf_dir)
+    self.filesystem_client.create_file(self.BASE_DIR + leaf_dir + file_path,
+                                       inserted_value)
     result = self.execute_query_expect_success(self.client,
         "SHOW PARTITIONS %s" % (self.TEST_TBL))
     assert self.has_value(part_name, result.data) == False
@@ -102,7 +102,7 @@ class TestRecoverPartitions(ImpalaTestSuite):
     result = self.execute_query_expect_success(self.client,
         "SHOW PARTITIONS %s" % (self.TEST_TBL))
     old_length = len(result.data)
-    self.hdfs_client.make_dir(self.BASE_DIR + malformed_dir)
+    self.filesystem_client.make_dir(self.BASE_DIR + malformed_dir)
     self.execute_query_expect_success(self.client,
         "ALTER TABLE %s RECOVER PARTITIONS" % (self.TEST_TBL))
     result = self.execute_query_expect_success(self.client,
@@ -113,8 +113,9 @@ class TestRecoverPartitions(ImpalaTestSuite):
 
     # Create a directory whose subdirectory names contain 
__HIVE_DEFAULT_PARTITION__
     # and check that is recovered as a NULL partition.
-    self.hdfs_client.make_dir(self.BASE_DIR + null_dir)
-    self.hdfs_client.create_file(self.BASE_DIR + null_dir + file_path, 
null_inserted_value)
+    self.filesystem_client.make_dir(self.BASE_DIR + null_dir)
+    self.filesystem_client.create_file(
+        self.BASE_DIR + null_dir + file_path, null_inserted_value)
     result = self.execute_query_expect_success(self.client,
         "SHOW PARTITIONS %s" % (self.TEST_TBL))
     assert self.has_value(self.DEF_NULL_PART_KEY, result.data) == False
@@ -129,7 +130,6 @@ class TestRecoverPartitions(ImpalaTestSuite):
         "select c from %s" % self.TEST_TBL)
     assert self.has_value(null_inserted_value, result.data) == True
 
-  @SkipIfS3.insert
   @SkipIfLocal.hdfs_client
   @pytest.mark.execute_serially
   def test_nondefault_location_partitions(self, vector):
@@ -149,9 +149,10 @@ class TestRecoverPartitions(ImpalaTestSuite):
     self.execute_query_expect_success(self.client,
         "ALTER TABLE %s PARTITION (i=1, p='p3') SET LOCATION '%s/%s.db/tmp' "
         % (self.TEST_TBL, WAREHOUSE, self.TEST_DB))
-    self.hdfs_client.delete_file_dir(self.BASE_DIR + leaf_dir, recursive=True)
-    self.hdfs_client.make_dir(self.BASE_DIR + leaf_dir);
-    self.hdfs_client.create_file(self.BASE_DIR + leaf_dir + file_path, 
inserted_value)
+    self.filesystem_client.delete_file_dir(self.BASE_DIR + leaf_dir, 
recursive=True)
+    self.filesystem_client.make_dir(self.BASE_DIR + leaf_dir);
+    self.filesystem_client.create_file(self.BASE_DIR + leaf_dir + file_path,
+                                       inserted_value)
     self.execute_query_expect_success(self.client,
         "ALTER TABLE %s RECOVER PARTITIONS" % (self.TEST_TBL))
     # Ensure that no duplicate partitions are recovered.
@@ -166,7 +167,6 @@ class TestRecoverPartitions(ImpalaTestSuite):
         "select c from %s" % self.TEST_TBL)
     assert self.has_value(inserted_value, result.data) == True
 
-  @SkipIfS3.insert
   @SkipIfLocal.hdfs_client
   @pytest.mark.execute_serially
   def test_duplicate_partitions(self, vector):
@@ -184,14 +184,15 @@ class TestRecoverPartitions(ImpalaTestSuite):
 
     # Create a partition with path "/i=1/p=p4".
     # Create a path "/i=0001/p=p4" using hdfs client, and add a file with some 
values.
-    # Test that no new partition will be recovered and the inserted data are 
not accessible.
+    # Test that no new partition will be recovered and the inserted data are 
not
+    # accessible.
     leaf_dir = "i=0001/p=p4/"
     inserted_value = "5"
 
     self.execute_query_expect_success(self.client,
         "ALTER TABLE %s ADD PARTITION(i=1, p='p4')" % (self.TEST_TBL))
-    self.hdfs_client.make_dir(self.BASE_DIR + leaf_dir);
-    self.hdfs_client.create_file(self.BASE_DIR + leaf_dir + file_path, 
inserted_value)
+    self.filesystem_client.make_dir(self.BASE_DIR + leaf_dir);
+    self.filesystem_client.create_file(self.BASE_DIR + leaf_dir + file_path, 
inserted_value)
     self.execute_query_expect_success(self.client,
         "ALTER TABLE %s RECOVER PARTITIONS" % (self.TEST_TBL))
     result = self.execute_query_expect_success(self.client,
@@ -205,8 +206,8 @@ class TestRecoverPartitions(ImpalaTestSuite):
     result = self.execute_query_expect_success(self.client,
         "SHOW PARTITIONS %s" % (self.TEST_TBL))
     old_length = len(result.data)
-    self.hdfs_client.make_dir(self.BASE_DIR + same_value_dir1)
-    self.hdfs_client.make_dir(self.BASE_DIR + same_value_dir2)
+    self.filesystem_client.make_dir(self.BASE_DIR + same_value_dir1)
+    self.filesystem_client.make_dir(self.BASE_DIR + same_value_dir2)
     # Only one partition will be added.
     self.execute_query_expect_success(self.client,
         "ALTER TABLE %s RECOVER PARTITIONS" % (self.TEST_TBL))
@@ -216,7 +217,6 @@ class TestRecoverPartitions(ImpalaTestSuite):
         "ALTER TABLE %s RECOVER PARTITIONS failed to handle duplicate 
partition key values."
         % (self.TEST_TBL))
 
-  @SkipIfS3.insert
   @SkipIfLocal.hdfs_client
   @pytest.mark.execute_serially
   def test_post_invalidate(self, vector):
@@ -233,8 +233,9 @@ class TestRecoverPartitions(ImpalaTestSuite):
     # Test that the recovered partitions are properly stored in Hive MetaStore.
     # Invalidate the table metadata and then check if the recovered partitions
     # are accessible.
-    self.hdfs_client.make_dir(self.BASE_DIR + leaf_dir);
-    self.hdfs_client.create_file(self.BASE_DIR + leaf_dir + file_path, 
inserted_value)
+    self.filesystem_client.make_dir(self.BASE_DIR + leaf_dir);
+    self.filesystem_client.create_file(self.BASE_DIR + leaf_dir + file_path,
+                                       inserted_value)
     self.execute_query_expect_success(self.client,
         "ALTER TABLE %s RECOVER PARTITIONS" % (self.TEST_TBL))
     result = self.execute_query_expect_success(self.client,
@@ -252,7 +253,6 @@ class TestRecoverPartitions(ImpalaTestSuite):
         "select c from %s" % self.TEST_TBL)
     assert self.has_value('4', result.data) == True
 
-  @SkipIfS3.insert
   @SkipIfLocal.hdfs_client
   @pytest.mark.execute_serially
   def test_support_all_types(self, vector):
@@ -279,7 +279,7 @@ class TestRecoverPartitions(ImpalaTestSuite):
         "SHOW PARTITIONS %s" % (self.TEST_TBL2))
     old_length = len(result.data)
     normal_dir = '/'.join(normal_values)
-    self.hdfs_client.make_dir(self.BASE_DIR2 + normal_dir)
+    self.filesystem_client.make_dir(self.BASE_DIR2 + normal_dir)
     # One partition will be added.
     self.execute_query_expect_success(self.client,
         "ALTER TABLE %s RECOVER PARTITIONS" % (self.TEST_TBL2))
@@ -308,7 +308,7 @@ class TestRecoverPartitions(ImpalaTestSuite):
           invalid_dir += (normal_values[j] + "/")
         else:
           invalid_dir += (invalid_values[j] + "/")
-      self.hdfs_client.make_dir(self.BASE_DIR2 + invalid_dir)
+      self.filesystem_client.make_dir(self.BASE_DIR2 + invalid_dir)
       # No partition will be added.
       self.execute_query_expect_success(self.client,
           "ALTER TABLE %s RECOVER PARTITIONS" % (self.TEST_TBL2))

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/metadata/test_show_create_table.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_show_create_table.py 
b/tests/metadata/test_show_create_table.py
index c9dd5aa..fdd868d 100644
--- a/tests/metadata/test_show_create_table.py
+++ b/tests/metadata/test_show_create_table.py
@@ -47,7 +47,6 @@ class TestShowCreateTable(ImpalaTestSuite):
         v.get_value('table_format').file_format == 'text' and
         v.get_value('table_format').compression_codec == 'none')
 
-  @SkipIfS3.insert
   def test_show_create_table(self, vector, unique_database):
     self.__run_show_create_table_test_case('QueryTest/show-create-table', 
vector,
                                            unique_database)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_aggregation.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_aggregation.py 
b/tests/query_test/test_aggregation.py
index 99bfb22..0d113a7 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -94,7 +94,6 @@ class TestAggregationQueries(ImpalaTestSuite):
     if cls.exploration_strategy() == 'core':
       
cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload()))
 
-  @SkipIfS3.insert
   @pytest.mark.execute_serially
   def test_non_codegen_tinyint_grouping(self, vector):
     # Regression for IMPALA-901. The test includes an INSERT statement, so can 
only be run

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_cancellation.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_cancellation.py 
b/tests/query_test/test_cancellation.py
index ea619e3..729ed3b 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -157,7 +157,6 @@ class TestCancellationSerial(TestCancellation):
       cls.TestMatrix.add_constraint(lambda v: v.get_value('cancel_delay') in 
[3])
       cls.TestMatrix.add_constraint(lambda v: v.get_value('query') == 
choice(QUERIES))
 
-  @SkipIfS3.insert
   @pytest.mark.execute_serially
   def test_cancel_insert(self, vector):
     self.execute_cancel_test(vector)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_chars.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_chars.py b/tests/query_test/test_chars.py
index 11381b3..a1bfe58 100644
--- a/tests/query_test/test_chars.py
+++ b/tests/query_test/test_chars.py
@@ -9,7 +9,6 @@ from tests.common.impala_test_suite import *
 from tests.common.skip import SkipIfS3
 from tests.util.filesystem_utils import WAREHOUSE
 
[email protected]
 class TestStringQueries(ImpalaTestSuite):
   @classmethod
   def get_workload(cls):

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_compressed_formats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_compressed_formats.py 
b/tests/query_test/test_compressed_formats.py
index a50fafe..15f799c 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -111,7 +111,6 @@ class TestCompressedFormats(ImpalaTestSuite):
     finally:
       call(["hive", "-e", drop_cmd]);
 
[email protected]
 class TestTableWriters(ImpalaTestSuite):
   @classmethod
   def get_workload(cls):
@@ -143,7 +142,6 @@ class TestTableWriters(ImpalaTestSuite):
     pytest.skip()
     self.run_test_case('QueryTest/text-writer', vector)
 
[email protected]
 @pytest.mark.execute_serially
 class TestLargeCompressedFile(ImpalaTestSuite):
   """ Tests that we gracefully handle when a compressed file in HDFS is larger

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_delimited_text.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_delimited_text.py 
b/tests/query_test/test_delimited_text.py
index 864b411..371b7f4 100644
--- a/tests/query_test/test_delimited_text.py
+++ b/tests/query_test/test_delimited_text.py
@@ -43,11 +43,9 @@ class TestDelimitedText(ImpalaTestSuite):
     self.client.execute('drop table if exists %s.nl_queries' % 
self.TEST_DB_NAME)
     self.client.execute('drop database if exists %s' % self.TEST_DB_NAME)
 
-  @SkipIfS3.insert
   def test_delimited_text(self, vector):
     self.run_test_case('QueryTest/delimited-text', vector)
 
-  @SkipIfS3.insert
   def test_delimited_text_newlines(self, vector):
     """ Test text with newlines in strings - IMPALA-1943. Execute queries from 
Python to
     avoid issues with newline handling in test file format. """
@@ -73,7 +71,6 @@ class TestDelimitedText(ImpalaTestSuite):
     assert len(result.data) == 1
     assert result.data[0] == "2"
 
-  @SkipIfS3.insert
   @pytest.mark.execute_serially
   def test_delimited_text_latin_chars(self, vector):
     """Verifies Impala is able to properly handle delimited text that contains

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_insert.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index 981520f..6929fca 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -15,7 +15,6 @@ from tests.util.filesystem_utils import IS_LOCAL
 # TODO: Add Gzip back.  IMPALA-424
 PARQUET_CODECS = ['none', 'snappy']
 
[email protected]
 class TestInsertQueries(ImpalaTestSuite):
   @classmethod
   def get_workload(self):
@@ -125,7 +124,6 @@ class TestInsertWideTable(ImpalaTestSuite):
     actual = QueryTestResult(parse_result_rows(result), types, labels, 
order_matters=False)
     assert expected == actual
 
[email protected]
 class TestInsertPartKey(ImpalaTestSuite):
   """Regression test for IMPALA-875"""
   @classmethod
@@ -151,7 +149,6 @@ class TestInsertPartKey(ImpalaTestSuite):
     self.run_test_case('QueryTest/insert_part_key', vector,
         multiple_impalad=vector.get_value('exec_option')['sync_ddl'] == 1)
 
[email protected]
 class TestInsertNullQueries(ImpalaTestSuite):
   @classmethod
   def get_workload(self):

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_insert_behaviour.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_behaviour.py 
b/tests/query_test/test_insert_behaviour.py
index d6c344b..a03ce00 100644
--- a/tests/query_test/test_insert_behaviour.py
+++ b/tests/query_test/test_insert_behaviour.py
@@ -22,8 +22,6 @@ from tests.common.parametrize import UniqueDatabase
 from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal
 from tests.util.filesystem_utils import WAREHOUSE, get_fs_path
 
-
[email protected]
 @SkipIfLocal.hdfs_client
 class TestInsertBehaviour(ImpalaTestSuite):
   """Tests for INSERT behaviour that isn't covered by checking query results"""
@@ -45,12 +43,12 @@ class TestInsertBehaviour(ImpalaTestSuite):
   def test_insert_removes_staging_files(self):
     TBL_NAME = "insert_overwrite_nopart"
     insert_staging_dir = ("test-warehouse/functional.db/%s/"
-                          "_impala_insert_staging" % TBL_NAME)
-    self.hdfs_client.delete_file_dir(insert_staging_dir, recursive=True)
-    self.client.execute("INSERT OVERWRITE functional.%s"
-                        " SELECT int_col FROM functional.tinyinttable" % 
TBL_NAME)
-    ls = self.hdfs_client.list_dir(insert_staging_dir)
-    assert len(ls['FileStatuses']['FileStatus']) == 0
+        "_impala_insert_staging" % TBL_NAME)
+    self.filesystem_client.delete_file_dir(insert_staging_dir, recursive=True)
+    self.client.execute("INSERT OVERWRITE functional.%s "
+                        "SELECT int_col FROM functional.tinyinttable" % 
TBL_NAME)
+    ls = self.filesystem_client.ls(insert_staging_dir)
+    assert len(ls) == 0
 
   @pytest.mark.execute_serially
   def test_insert_preserves_hidden_files(self):
@@ -59,27 +57,28 @@ class TestInsertBehaviour(ImpalaTestSuite):
     table_dir = "test-warehouse/functional.db/%s/" % TBL_NAME
     hidden_file_locations = [".hidden", "_hidden"]
     dir_locations = ["dir", ".hidden_dir"]
+
     for dir_ in dir_locations:
-      self.hdfs_client.make_dir(table_dir + dir_)
+      self.filesystem_client.make_dir(table_dir + dir_)
+
+    # We do this here because the above 'make_dir' call doesn't make a 
directory for S3.
+    for dir_ in dir_locations:
+      self.filesystem_client.create_file(
+          table_dir + dir_ + '/' + hidden_file_locations[0] , '', 
overwrite=True)
+
     for file_ in hidden_file_locations:
-      self.hdfs_client.create_file(table_dir + file_, '', overwrite=True)
+      self.filesystem_client.create_file(table_dir + file_, '', overwrite=True)
 
     self.client.execute("INSERT OVERWRITE functional.%s"
                         " SELECT int_col FROM functional.tinyinttable" % 
TBL_NAME)
 
     for file_ in hidden_file_locations:
-      try:
-        self.hdfs_client.get_file_dir_status(table_dir + file_)
-      except:
-        err_msg = "Hidden file '%s' was unexpectedly deleted by INSERT 
OVERWRITE"
-        pytest.fail(err_msg % (table_dir + file_))
+      assert self.filesystem_client.exists(table_dir + file_), "Hidden file 
{0} was " \
+          "unexpectedly deleted by INSERT OVERWRITE".format(table_dir + file_)
 
     for dir_ in dir_locations:
-      try:
-        self.hdfs_client.get_file_dir_status(table_dir + file_)
-      except:
-        err_msg = "Directory '%s' was unexpectedly deleted by INSERT OVERWRITE"
-        pytest.fail(err_msg % (table_dir + dir_))
+      assert self.filesystem_client.exists(table_dir + dir_), "Directory {0} 
was " \
+          "unexpectedly deleted by INSERT OVERWRITE".format(table_dir + dir_)
 
   
@UniqueDatabase.parametrize(name_prefix='test_insert_alter_partition_location_db')
   def test_insert_alter_partition_location(self, unique_database):
@@ -90,7 +89,7 @@ class TestInsertBehaviour(ImpalaTestSuite):
     table_name = 
"`{0}`.`insert_alter_partition_location`".format(unique_database)
 
     self.execute_query_expect_success(self.client, "DROP TABLE IF EXISTS %s" % 
table_name)
-    self.hdfs_client.delete_file_dir(part_dir, recursive=True)
+    self.filesystem_client.delete_file_dir(part_dir, recursive=True)
 
     self.execute_query_expect_success(
         self.client,
@@ -113,9 +112,9 @@ class TestInsertBehaviour(ImpalaTestSuite):
 
     # Should have created the partition dir, which should contain exactly one 
file (not in
     # a subdirectory)
-    ls = self.hdfs_client.list_dir(part_dir)
-    assert len(ls['FileStatuses']['FileStatus']) == 1
+    assert len(self.filesystem_client.ls(part_dir)) == 1
 
+  @SkipIfS3.hdfs_acls
   @SkipIfIsilon.hdfs_acls
   @pytest.mark.xfail(run=False, reason="Fails intermittently on test clusters")
   @pytest.mark.execute_serially
@@ -175,6 +174,7 @@ class TestInsertBehaviour(ImpalaTestSuite):
                                       "PARTITION(p1=1, p2=2, p3=30) VALUES(1)")
     check_has_acls("p1=1/p2=2/p3=30", "default:group:new_leaf_group:-w-")
 
+  @SkipIfS3.hdfs_acls
   @SkipIfIsilon.hdfs_acls
   def test_insert_file_permissions(self, unique_database):
     """Test that INSERT correctly respects file permission (minimum ACLs)"""
@@ -224,6 +224,7 @@ class TestInsertBehaviour(ImpalaTestSuite):
     # Should be writable because 'other' ACLs allow writes
     self.execute_query_expect_success(self.client, insert_query)
 
+  @SkipIfS3.hdfs_acls
   @SkipIfIsilon.hdfs_acls
   def test_insert_acl_permissions(self, unique_database):
     """Test that INSERT correctly respects ACLs"""
@@ -300,6 +301,7 @@ class TestInsertBehaviour(ImpalaTestSuite):
     # Should be writable because 'other' ACLs allow writes
     self.execute_query_expect_success(self.client, insert_query)
 
+  @SkipIfS3.hdfs_acls
   @SkipIfIsilon.hdfs_acls
   def test_load_permissions(self, unique_database):
     # We rely on test_insert_acl_permissions() to exhaustively check that ACL 
semantics
@@ -357,13 +359,9 @@ class TestInsertBehaviour(ImpalaTestSuite):
     """Test insert/select query won't trigger partition directory or zero size 
data file
     creation if the resultset of select is empty."""
     def check_path_exists(path, should_exist):
-      try:
-        self.hdfs_client.get_file_dir_status(path)
-        if not should_exist:
-          pytest.fail("file/dir '%s' unexpectedly exists" % path)
-      except Exception:
-        if should_exist:
-          pytest.fail("file/dir '%s' does not exist" % path)
+      assert self.filesystem_client.exists(path) == should_exist, "file/dir 
'{0}' " \
+          "should {1}exist but does {2}exist.".format(
+              path, '' if should_exist else 'not ', 'not ' if should_exist 
else '')
 
     db_path = "test-warehouse/%s.db/" % self.TEST_DB_NAME
     table_path = db_path + "test_insert_empty_result"
@@ -382,8 +380,8 @@ class TestInsertBehaviour(ImpalaTestSuite):
     insert_query = ("INSERT INTO TABLE {0} PARTITION(year=2009, month=1)"
                     "select 1, 1 from {0} LIMIT 0".format(table_name))
     self.execute_query_expect_success(self.client, insert_query)
-    # Partition directory should not be created
-    check_path_exists(partition_path, False)
+    # Partition directory is created
+    check_path_exists(partition_path, True)
 
     # Insert one record
     insert_query_one_row = ("INSERT INTO TABLE %s PARTITION(year=2009, 
month=1) "
@@ -391,24 +389,24 @@ class TestInsertBehaviour(ImpalaTestSuite):
     self.execute_query_expect_success(self.client, insert_query_one_row)
     # Partition directory should be created with one data file
     check_path_exists(partition_path, True)
-    ls = self.hdfs_client.list_dir(partition_path)
-    assert len(ls['FileStatuses']['FileStatus']) == 1
+    ls = (self.filesystem_client.ls(partition_path))
+    assert len(ls) == 1
 
     # Run an insert/select statement that returns an empty resultset again
     self.execute_query_expect_success(self.client, insert_query)
     # No new data file should be created
-    new_ls = self.hdfs_client.list_dir(partition_path)
-    assert len(new_ls['FileStatuses']['FileStatus']) == 1
-    assert new_ls['FileStatuses'] == ls['FileStatuses']
+    new_ls = self.filesystem_client.ls(partition_path)
+    assert len(new_ls) == 1
+    assert new_ls == ls
 
     # Run an insert overwrite/select that returns an empty resultset
     insert_query = ("INSERT OVERWRITE {0} PARTITION(year=2009, month=1)"
                     " select 1, 1 from  {0} LIMIT 0".format(table_name))
     self.execute_query_expect_success(self.client, insert_query)
     # Data file should be deleted
-    new_ls2 = self.hdfs_client.list_dir(partition_path)
-    assert len(new_ls2['FileStatuses']['FileStatus']) == 0
-    assert new_ls['FileStatuses'] != new_ls2['FileStatuses']
+    new_ls2 = self.filesystem_client.ls(partition_path)
+    assert len(new_ls2) == 0
+    assert new_ls != new_ls2
 
     # Test for IMPALA-2008 insert overwrite to an empty table with empty 
dataset
     empty_target_tbl = "test_overwrite_with_empty_target"
@@ -422,9 +420,10 @@ class TestInsertBehaviour(ImpalaTestSuite):
     # Delete target table directory, query should fail with
     # "No such file or directory" error
     target_table_path = "%s%s" % (db_path, empty_target_tbl)
-    self.hdfs_client.delete_file_dir(target_table_path, recursive=True)
+    self.filesystem_client.delete_file_dir(target_table_path, recursive=True)
     self.execute_query_expect_failure(self.client, insert_query)
 
+  @SkipIfS3.hdfs_acls
   @SkipIfIsilon.hdfs_acls
   def test_multiple_group_acls(self, unique_database):
     """Test that INSERT correctly respects multiple group ACLs"""

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_insert_parquet.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_parquet.py 
b/tests/query_test/test_insert_parquet.py
index 0fcf083..a3c5f35 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -18,7 +18,6 @@ PARQUET_CODECS = ['none', 'snappy']
 # TODO: these tests take a while so we don't want to go through too many sizes 
but
 # we should in more exhaustive testing
 PARQUET_FILE_SIZES = [0, 32 * 1024 * 1024]
[email protected]
 class TestInsertParquetQueries(ImpalaTestSuite):
   @classmethod
   def get_workload(self):
@@ -57,7 +56,6 @@ class TestInsertParquetQueries(ImpalaTestSuite):
         vector.get_value('compression_codec')
     self.run_test_case('insert_parquet', vector, multiple_impalad=True)
 
[email protected]
 class TestInsertParquetInvalidCodec(ImpalaTestSuite):
   @classmethod
   def get_workload(self):
@@ -88,7 +86,6 @@ class TestInsertParquetInvalidCodec(ImpalaTestSuite):
                        multiple_impalad=True)
 
 
[email protected]
 class TestInsertParquetVerifySize(ImpalaTestSuite):
   @classmethod
   def get_workload(self):
@@ -136,13 +133,10 @@ class TestInsertParquetVerifySize(ImpalaTestSuite):
     # Get the files in hdfs and verify. There can be at most 1 file that is 
smaller
     # that the BLOCK_SIZE. The rest should be within 80% of it and not over.
     found_small_file = False
-    ls = self.hdfs_client.list_dir(DIR)
-    for f in ls['FileStatuses']['FileStatus']:
-      if f['type'] != 'FILE':
-        continue
-      length = f['length']
-      print length
-      assert length < BLOCK_SIZE
-      if length < BLOCK_SIZE * 0.80:
+    sizes = self.filesystem_client.get_all_file_sizes(DIR)
+    for size in sizes:
+      assert size < BLOCK_SIZE, "File size greater than expected.\
+          Expected: {0}, Got: {1}".format(BLOCK_SIZE, size)
+      if size < BLOCK_SIZE * 0.80:
         assert found_small_file == False
         found_small_file = True

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_insert_permutation.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_permutation.py 
b/tests/query_test/test_insert_permutation.py
index dd34e09..cac7336 100644
--- a/tests/query_test/test_insert_permutation.py
+++ b/tests/query_test/test_insert_permutation.py
@@ -30,7 +30,6 @@ class TestInsertQueriesWithPermutation(ImpalaTestSuite):
         cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0]))
     
cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload()))
 
-  @SkipIfS3.insert
   def test_insert_permutation(self, vector):
     map(self.cleanup_db, ["insert_permutation_test"])
     self.run_test_case('QueryTest/insert_permutation', vector)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_join_queries.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_join_queries.py 
b/tests/query_test/test_join_queries.py
index f617ffd..2729b31 100644
--- a/tests/query_test/test_join_queries.py
+++ b/tests/query_test/test_join_queries.py
@@ -96,7 +96,6 @@ class TestTPCHJoinQueries(ImpalaTestSuite):
     new_vector.get_value('exec_option')['batch_size'] = 
vector.get_value('batch_size')
     self.run_test_case('tpch-outer-joins', new_vector)
 
[email protected]
 class TestSemiJoinQueries(ImpalaTestSuite):
   @classmethod
   def get_workload(cls):

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_multiple_filesystems.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_multiple_filesystems.py 
b/tests/query_test/test_multiple_filesystems.py
index 1ea2b50..926c247 100644
--- a/tests/query_test/test_multiple_filesystems.py
+++ b/tests/query_test/test_multiple_filesystems.py
@@ -8,7 +8,6 @@ from tests.common.test_dimensions import 
create_single_exec_option_dimension
 from tests.common.skip import SkipIf, SkipIfIsilon, SkipIfS3
 from tests.util.filesystem_utils import get_fs_path
 
[email protected]
 @SkipIf.default_fs # Run only when a non-default filesystem is available.
 @SkipIfIsilon.untriaged # Missing coverage: Find out why this is failing.
 class TestMultipleFilesystems(ImpalaTestSuite):

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_partitioning.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_partitioning.py 
b/tests/query_test/test_partitioning.py
index cb74bb0..7e74b60 100644
--- a/tests/query_test/test_partitioning.py
+++ b/tests/query_test/test_partitioning.py
@@ -45,7 +45,7 @@ class TestPartitioning(ImpalaTestSuite):
   def setup_class(cls):
     super(TestPartitioning, cls).setup_class()
     map(cls.cleanup_db, cls.TEST_DBS)
-    
cls.hdfs_client.delete_file_dir("test-warehouse/all_insert_partition_col_types/",\
+    
cls.filesystem_client.delete_file_dir("test-warehouse/all_insert_partition_col_types/",\
         recursive=True)
 
   @classmethod
@@ -53,7 +53,6 @@ class TestPartitioning(ImpalaTestSuite):
     map(cls.cleanup_db, cls.TEST_DBS)
     super(TestPartitioning, cls).teardown_class()
 
-  @SkipIfS3.insert
   @SkipIfLocal.root_path
   @pytest.mark.execute_serially
   def test_partition_col_types(self, vector):
@@ -67,7 +66,6 @@ class TestPartitioning(ImpalaTestSuite):
   @SkipIfIsilon.hive
   @SkipIfLocal.hive
   @pytest.mark.execute_serially
-  @SkipIfS3.insert
   def test_boolean_partitions(self, vector):
     # This test takes about a minute to complete due to the Hive commands that 
are
     # executed. To cut down on runtime, limit the test to exhaustive 
exploration

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_queries.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py
index 016c730..71fd21c 100644
--- a/tests/query_test/test_queries.py
+++ b/tests/query_test/test_queries.py
@@ -146,7 +146,6 @@ class TestQueriesTextTables(ImpalaTestSuite):
   def test_mixed_format(self, vector):
     self.run_test_case('QueryTest/mixed-format', vector)
 
-  @SkipIfS3.insert
   def test_values(self, vector):
     self.run_test_case('QueryTest/values', vector)
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index e64bf0c..15181fc 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -281,7 +281,6 @@ class TestParquet(ImpalaTestSuite):
     for scan_ranges_complete in scan_ranges_complete_list:
       assert int(scan_ranges_complete) == ranges_per_node
 
-  @SkipIfS3.insert
   def test_annotate_utf8_option(self, vector, unique_database):
     if self.exploration_strategy() != 'exhaustive': pytest.skip("Only run in 
exhaustive")
 
@@ -338,7 +337,6 @@ class TestParquet(ImpalaTestSuite):
     assert c_schema_elt.converted_type == ConvertedType.UTF8
     assert d_schema_elt.converted_type == None
 
-  @SkipIfS3.insert
   @SkipIfOldAggsJoins.nested_types
   def test_resolution_by_name(self, unique_database, vector):
     self.run_test_case('QueryTest/parquet-resolution-by-name', vector,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/shell/test_shell_commandline.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_commandline.py 
b/tests/shell/test_shell_commandline.py
index d88bf11..e06f7eb 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -35,7 +35,6 @@ TEST_DB = "tmp_shell"
 TEST_TBL = "tbl1"
 QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
 
[email protected]
 class TestImpalaShell(object):
   """A set of sanity tests for the Impala shell commandline parameters.
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/util/filesystem_base.py
----------------------------------------------------------------------
diff --git a/tests/util/filesystem_base.py b/tests/util/filesystem_base.py
new file mode 100644
index 0000000..da9458b
--- /dev/null
+++ b/tests/util/filesystem_base.py
@@ -0,0 +1,61 @@
+# Copyright (c) 2016 Cloudera, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Filsystem access abstraction
+
+from abc import ABCMeta, abstractmethod
+
+class BaseFilesystem(object):
+  __metaclass__ = ABCMeta
+
+  @abstractmethod
+  def create_file(self, path, file_data, overwrite):
+    """Create a file in 'path' and populate with the string 'file_data'. If 
overwrite is
+    True, the file is overwritten. Returns True if successful, False if the 
file already
+    exists and throws an exception otherwise"""
+    pass
+
+  @abstractmethod
+  def make_dir(self, path, permission):
+    """Create a directory in 'path' with octal umask 'permission'.
+    Returns True if successful and throws an exception otherwise"""
+    pass
+
+  @abstractmethod
+  def copy(self, src, dst):
+    """Copy a file from 'src' to 'dst'. Throws an exception if unsuccessful."""
+    pass
+
+  @abstractmethod
+  def ls(self, path):
+    """Return a list of all files/dirs/keys in path. Throws an exception if 
path
+    is invalid."""
+    pass
+
+  @abstractmethod
+  def exists(self, path):
+    """Returns True if a particular path exists, else it returns False."""
+    pass
+
+  @abstractmethod
+  def delete_file_dir(self, path, recursive):
+    """Delete all files/dirs/keys in a path. Returns True if successful or if 
the file
+    does not exist. Throws an exception otherwise."""
+    pass
+
+  @abstractmethod
+  def get_all_file_sizes(self, path):
+    """Returns a list of integers which are all the file sizes of files found 
under
+    'path'."""
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/util/filesystem_utils.py
----------------------------------------------------------------------
diff --git a/tests/util/filesystem_utils.py b/tests/util/filesystem_utils.py
index 0f60a37..d22ae00 100644
--- a/tests/util/filesystem_utils.py
+++ b/tests/util/filesystem_utils.py
@@ -29,6 +29,9 @@ IS_DEFAULT_FS = not FILESYSTEM_PREFIX or IS_LOCAL
 # Isilon specific values.
 ISILON_WEBHDFS_PORT = 8082
 
+# S3 specific values
+S3_BUCKET_NAME = os.getenv("S3_BUCKET")
+
 def get_fs_path(path):
   return "%s%s" % (FILESYSTEM_PREFIX, path)
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/util/hdfs_util.py
----------------------------------------------------------------------
diff --git a/tests/util/hdfs_util.py b/tests/util/hdfs_util.py
index 7ca83aa..7870666 100644
--- a/tests/util/hdfs_util.py
+++ b/tests/util/hdfs_util.py
@@ -18,12 +18,13 @@ from os import environ
 from os.path import join as join_path
 from pywebhdfs.webhdfs import PyWebHdfsClient, errors, 
_raise_pywebhdfs_exception
 from xml.etree.ElementTree import parse
+from tests.util.filesystem_base import BaseFilesystem
 import getpass
 import httplib
 import requests
 import types
 
-class PyWebHdfsClientWithChmod(PyWebHdfsClient):
+class PyWebHdfsClientWithChmod(PyWebHdfsClient, BaseFilesystem):
   def chmod(self, path, permission):
     """Set the permission of 'path' to 'permission' (specified as an octal 
string, e.g.
     '775'"""
@@ -61,9 +62,7 @@ class PyWebHdfsClientWithChmod(PyWebHdfsClient):
     Overrides the superclass's method by providing delete if exists semantics. 
This takes
     the burden of stat'ing the file away from the caller.
     """
-    try:
-      self.get_file_dir_status(path)
-    except Exception as e:
+    if not self.exists(path):
       return True
     return super(PyWebHdfsClientWithChmod, self).delete_file_dir(path,
         recursive=recursive)
@@ -77,6 +76,14 @@ class PyWebHdfsClientWithChmod(PyWebHdfsClient):
     path = path.lstrip('/')
     return super(PyWebHdfsClientWithChmod, self).get_file_dir_status(path)
 
+  def get_all_file_sizes(self, path):
+    """Returns a list of all file sizes in the path"""
+    sizes = []
+    for status in self.list_dir(path).get('FileStatuses').get('FileStatus'):
+      if status['type'] == 'FILE':
+        sizes += [status['length']]
+    return sizes
+
   def copy(self, src, dest):
     """Copies a file in hdfs from src to destination
 
@@ -92,8 +99,21 @@ class PyWebHdfsClientWithChmod(PyWebHdfsClient):
     self.create_file(dest, data)
     assert self.get_file_dir_status(dest)
     assert self.read_file(dest) == data
-    return True
 
+  def ls(self, path):
+    """Returns a list of all file and directory names in 'path'"""
+    # list_dir() returns a dictionary of file statues. This function picks out 
the
+    # file and directory names and just returns a list of the names.
+    file_infos = self.list_dir(path).get('FileStatuses').get('FileStatus')
+    return [info.get('pathSuffix') for info in file_infos]
+
+  def exists(self, path):
+    """Checks if a particular path exists"""
+    try:
+      self.get_file_dir_status(path)
+    except errors.FileNotFound:
+      return False
+    return True
 
 class HdfsConfig(object):
   """Reads an XML configuration file (produced by a mini-cluster) into a 
dictionary
@@ -116,23 +136,9 @@ def get_hdfs_client_from_conf(conf):
   host, port = hostport.split(":")
   return get_hdfs_client(host=host, port=port)
 
-def _pyweb_hdfs_client_exists(self, path):
-  """The PyWebHdfsClient doesn't provide an API to cleanly detect if a file or 
directory
-  exists. This method is bound to each client that is created so tests can 
simply call
-  hdfs_client.exists('path') and get back a bool.
-  """
-  try:
-    self.get_file_dir_status(path)
-  except errors.FileNotFound:
-    return False
-  return True
-
 def get_hdfs_client(host, port, user_name=getpass.getuser()):
   """Returns a new HTTP client for an HDFS cluster using an explict host:port 
pair"""
-  hdfs_client = PyWebHdfsClientWithChmod(host=host, port=port, 
user_name=user_name)
-  # Bind our "exists" method to hdfs_client.exists
-  hdfs_client.exists = types.MethodType(_pyweb_hdfs_client_exists, hdfs_client)
-  return hdfs_client
+  return PyWebHdfsClientWithChmod(host=host, port=port, user_name=user_name)
 
 def get_default_hdfs_config():
   core_site_path = join_path(environ.get('HADOOP_CONF_DIR'), 'core-site.xml')

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed7f5ebf/tests/util/s3_util.py
----------------------------------------------------------------------
diff --git a/tests/util/s3_util.py b/tests/util/s3_util.py
new file mode 100644
index 0000000..42e44e8
--- /dev/null
+++ b/tests/util/s3_util.py
@@ -0,0 +1,99 @@
+# Copyright (c) 2016 Cloudera, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# S3 access utilities
+#
+# This file uses the boto3 client and provides simple functions to the Impala 
test suite
+# to access Amazon S3.
+
+import boto3
+import copy
+from tests.util.filesystem_base import BaseFilesystem
+
+class S3Client(BaseFilesystem):
+
+  @classmethod
+  def __init__(self, bucket):
+    self.bucketname = bucket
+    self.s3 = boto3.resource('s3')
+    self.bucket = self.s3.Bucket(self.bucketname)
+    self.s3client = boto3.client('s3')
+
+  def create_file(self, path, file_data, overwrite=True):
+    if not overwrite and self.exists(filename): return False
+    self.s3client.put_object(Bucket=self.bucketname, Key=path, Body=file_data)
+    return True
+
+  def make_dir(self, path, permission=None):
+    # This function is a no-op. S3 is a key-value store and does not have a 
directory
+    # structure. We can use a non existant path as though it already exists.
+    pass
+
+  def copy(self, src, dst):
+    self.s3client.copy_object(Bucket=self.bucketname,
+                              CopySource={'Bucket':self.bucketname, 
'Key':src}, Key=dst)
+
+  # Since S3 is a key-value store, it does not have a command like 'ls' for a 
directory
+  # structured filesystem. It lists everything under a path recursively.
+  # We have to manipulate its response to get an 'ls' like output.
+  def ls(self, path):
+    if not path.endswith('/'):
+      path += '/'
+    # Use '/' as a delimiter so that we don't get all keys under a path 
recursively.
+    response = self.s3client.list_objects(
+        Bucket=self.bucketname, Prefix=path, Delimiter='/')
+    dirs = []
+    # Non-keys or "directories" will be listed as 'Prefix' under 
'CommonPrefixes'.
+    if 'CommonPrefixes' in response:
+      dirs = [t['Prefix'] for t in response['CommonPrefixes']]
+    files = []
+    # Keys or "files" will be listed as 'Key' under 'Contents'.
+    if 'Contents' in response:
+      files = [t['Key'] for t in response['Contents']]
+    files_and_dirs = []
+    files_and_dirs.extend([d.split('/')[-2] for d in dirs])
+    for f in files:
+      key = f.split("/")[-1]
+      if not key == '':
+        files_and_dirs += [key]
+    return files_and_dirs
+
+  def get_all_file_sizes(self, path):
+    if not path.endswith('/'):
+      path += '/'
+    # Use '/' as a delimiter so that we don't get all keys under a path 
recursively.
+    response = self.s3client.list_objects(
+        Bucket=self.bucketname, Prefix=path, Delimiter='/')
+    if 'Contents' in response:
+      return [t['Size'] for t in response['Contents']]
+    return []
+
+  def exists(self, path):
+    response = self.s3client.list_objects(Bucket=self.bucketname,Prefix=path)
+    return response.get('Contents') is not None
+
+  # Helper function which lists keys in a path. Should not be used by the 
tests directly.
+  def _list_keys(self, path):
+    if not self.exists(path):
+      return False
+    response = self.s3client.list_objects(Bucket=self.bucketname, Prefix=path)
+    contents = response.get('Contents')
+    return [c['Key'] for c in contents]
+
+  def delete_file_dir(self, path, recursive=False):
+    if not self.exists(path):
+      return True
+    objects = [{'Key': k} for k in self._list_keys(path)] if recursive else 
path
+    self.s3client.delete_objects(Bucket=self.bucketname, 
Delete={'Objects':objects})
+    return True

Reply via email to