This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ac87278  IMPALA-8950: Add -d, -f options to hdfs copyFromLocal, put, cp
ac87278 is described below

commit ac87278b169422091af1c03fcd2101516372defb
Author: Sahil Takiar <[email protected]>
AuthorDate: Wed Sep 18 15:22:07 2019 -0700

    IMPALA-8950: Add -d, -f options to hdfs copyFromLocal, put, cp
    
    Add the -d option and -f option to the following commands:
    
    `hdfs dfs -copyFromLocal <localsrc> URI`
    `hdfs dfs -put [ - | <localsrc1> .. ]. <dst>`
    `hdfs dfs -cp URI [URI ...] <dest>`
    
    The -d option "Skip[s] creation of temporary file with the suffix
    ._COPYING_." which improves performance of these commands on S3 since S3
    does not support metadata only renames.
    
    The -f option "Overwrites the destination if it already exists" combined
    with HADOOP-13884 this improves issues seen with S3 consistency issues by
    avoiding a HEAD request to check if the destination file exists or not.
    
    Added the method 'copy_from_local' to the BaseFilesystem class.
    Re-factored most usages of the aforementioned HDFS commands to use
    the filesystem_client. Some usages were not appropriate / worth
    refactoring, so occasionally this patch just adds the '-d' and '-f'
    options explicitly. All calls to '-put' were replaced with
    'copyFromLocal' because they both copy files from the local fs to a HDFS
    compatible target fs.
    
    Since WebHDFS does not have good support for copying files, this patch
    removes the copy functionality from the PyWebHdfsClientWithChmod.
    Re-factored the hdfs_client so that it uses a DelegatingHdfsClient
    that delegates to either the HadoopFsCommandLineClient or
    PyWebHdfsClientWithChmod.
    
    Testing:
    * Ran core tests on HDFS and S3
    
    Change-Id: I0d45db1c00554e6fb6bcc0b552596d86d4e30144
    Reviewed-on: http://gerrit.cloudera.org:8080/14311
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../QueryTest/parquet-resolution-by-name.test      |   6 +-
 tests/common/impala_test_suite.py                  |  50 +++++++--
 tests/custom_cluster/test_coordinators.py          |   8 +-
 .../test_hive_parquet_timestamp_conversion.py      |   4 +-
 tests/custom_cluster/test_insert_behaviour.py      |   9 +-
 .../custom_cluster/test_parquet_max_page_header.py |   2 +-
 tests/custom_cluster/test_udf_concurrency.py       |   2 +-
 tests/metadata/test_hidden_files.py                |  36 +++----
 tests/metadata/test_refresh_partition.py           |   5 +-
 tests/metadata/test_stale_metadata.py              |   2 +-
 tests/query_test/test_compressed_formats.py        |   9 +-
 tests/query_test/test_hdfs_file_mods.py            |   3 +-
 tests/query_test/test_insert_parquet.py            |   4 +-
 tests/query_test/test_multiple_filesystems.py      |   8 +-
 tests/query_test/test_nested_types.py              |   7 +-
 tests/query_test/test_scanners.py                  |   5 +-
 tests/query_test/test_scanners_fuzz.py             |   2 +-
 tests/query_test/test_udfs.py                      |  14 +--
 tests/util/adls_util.py                            |  14 +--
 tests/util/filesystem_base.py                      |   9 +-
 tests/util/hdfs_util.py                            | 117 +++++++++++++++------
 21 files changed, 206 insertions(+), 110 deletions(-)

diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-resolution-by-name.test
 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-resolution-by-name.test
index bda36ed..d618932 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-resolution-by-name.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-resolution-by-name.test
@@ -72,9 +72,9 @@ string,string
 create table nested_resolution_by_name_test like 
functional_parquet.complextypestbl;
 ====
 ---- SHELL
-hadoop fs -cp 
$FILESYSTEM_PREFIX/test-warehouse/complextypestbl_parquet/nullable.parq \
+hdfs dfs -cp -d -f 
$FILESYSTEM_PREFIX/test-warehouse/complextypestbl_parquet/nullable.parq \
 $FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/nested_resolution_by_name_test/
-hadoop fs -cp 
$FILESYSTEM_PREFIX/test-warehouse/complextypestbl_parquet/nonnullable.parq \
+hdfs dfs -cp -d -f 
$FILESYSTEM_PREFIX/test-warehouse/complextypestbl_parquet/nonnullable.parq \
 $FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/nested_resolution_by_name_test/
 ====
 ---- QUERY
@@ -191,7 +191,7 @@ create table switched_map_fields_resolution_test (int_map 
map<string,int>)
 stored as parquet;
 ====
 ---- SHELL
-hadoop fs -copyFromLocal \
+hdfs dfs -copyFromLocal -d -f \
 $IMPALA_HOME/testdata/parquet_schema_resolution/switched_map.parq \
 
$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/switched_map_fields_resolution_test/
 ====
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index d23901e..645d30b 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -63,6 +63,7 @@ from tests.util.filesystem_utils import (
     IS_S3,
     IS_ABFS,
     IS_ADLS,
+    IS_HDFS,
     S3_BUCKET_NAME,
     S3GUARD_ENABLED,
     ADLS_STORE_NAME,
@@ -71,9 +72,10 @@ from tests.util.filesystem_utils import (
 
 from tests.util.hdfs_util import (
   HdfsConfig,
-  get_hdfs_client,
-  get_hdfs_client_from_conf,
+  get_webhdfs_client,
+  get_webhdfs_client_from_conf,
   NAMENODE,
+  DelegatingHdfsClient,
   HadoopFsCommandLineClient)
 from tests.util.test_file_parser import (
   QueryTestSectionReader,
@@ -173,11 +175,41 @@ class ImpalaTestSuite(BaseTestSuite):
 
     # Default query options are populated on demand.
     cls.default_query_options = {}
-
     cls.impalad_test_service = cls.create_impala_service()
-    cls.hdfs_client = cls.create_hdfs_client()
-    cls.filesystem_client = cls.hdfs_client
-    if IS_S3:
+
+    # There are multiple clients for interacting with the underlying storage 
service.
+    #
+    # There are two main types of clients: filesystem-specific clients and CLI 
clients.
+    # CLI clients all use the 'hdfs dfs' CLI to execute operations against a 
target
+    # filesystem.
+    #
+    # 'filesystem_client' is a generic interface for doing filesystem 
operations that
+    # works across all the filesystems that Impala supports. 
'filesystem_client' uses
+    # either the HDFS command line (e.g. 'hdfs dfs'), a filesystem-specific 
library, or
+    # a wrapper around both, to implement common HDFS operations.
+    #
+    # *Test writers should always use 'filesystem_client' unless they are 
using filesystem
+    # specific functionality (e.g. HDFS ACLs).*
+    #
+    # The implementation of 'filesystem_client' for each filesystem is:
+    #     HDFS: uses a mixture of pywebhdfs (which is faster than the HDFS 
CLI) and the
+    #           HDFS CLI
+    #     S3:   uses the HDFS CLI
+    #     ABFS: uses the HDFS CLI
+    #     ADLS: uses a mixture of azure-data-lake-store-python and the HDFS 
CLI (TODO:
+    #           this should completely switch to the HDFS CLI once we test it)
+    #
+    # 'hdfs_client' is a HDFS-specific client library, and it only works when 
running on
+    # HDFS. When using 'hdfs_client', the test must be skipped on everything 
other than
+    # HDFS. This is only really useful for tests that do HDFS ACL operations. 
The
+    # 'hdfs_client' supports all the methods and functionality of the 
'filesystem_client',
+    # with additional support for ACL operations such as chmod, chown, getacl, 
and setacl.
+    # 'hdfs_client' is set to None on non-HDFS systems.
+
+    if IS_HDFS:
+      cls.hdfs_client = cls.create_hdfs_client()
+      cls.filesystem_client = cls.hdfs_client
+    elif IS_S3:
       # S3Guard needs filesystem operations to go through the s3 connector. 
Use the
       # HDFS command line client.
       cls.filesystem_client = HadoopFsCommandLineClient("S3")
@@ -253,11 +285,11 @@ class ImpalaTestSuite(BaseTestSuite):
   @classmethod
   def create_hdfs_client(cls):
     if pytest.config.option.namenode_http_address is None:
-      hdfs_client = get_hdfs_client_from_conf(HDFS_CONF)
+      webhdfs_client = get_webhdfs_client_from_conf(HDFS_CONF)
     else:
       host, port = pytest.config.option.namenode_http_address.split(":")
-      hdfs_client = get_hdfs_client(host, port)
-    return hdfs_client
+      webhdfs_client = get_webhdfs_client(host, port)
+    return DelegatingHdfsClient(webhdfs_client, HadoopFsCommandLineClient())
 
   @classmethod
   def all_db_names(cls):
diff --git a/tests/custom_cluster/test_coordinators.py 
b/tests/custom_cluster/test_coordinators.py
index 997e7ed..0bf5164 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -149,7 +149,7 @@ class TestCoordinators(CustomClusterTestSuite):
 
     # copy jar with TestUpdateUdf (old) to tmp.jar
     check_call(["hadoop", "fs", "-mkdir", "-p", tgt_dir])
-    check_call(["hadoop", "fs", "-put", "-f", old_src_path, tgt_path])
+    self.filesystem_client.copy_from_local(old_src_path, tgt_path)
 
     coordinator = self.cluster.impalads[0]
     try:
@@ -173,7 +173,7 @@ class TestCoordinators(CustomClusterTestSuite):
       assert result.data == ['7300']
 
       # copy a new jar with TestUpdateUdf (new) and NewReplaceStringUdf to 
tmp.jar.
-      check_call(["hadoop", "fs", "-put", "-f", new_src_path, tgt_path])
+      self.filesystem_client.copy_from_local(new_src_path, tgt_path)
 
       # create a function for the updated TestUpdateUdf.
       create_new_fn = (
@@ -210,7 +210,7 @@ class TestCoordinators(CustomClusterTestSuite):
 
       # Copy jar to a new path.
       tgt_path_2 = tgt_dir + "/tmp2.jar"
-      check_call(["hadoop", "fs", "-put", "-f", old_src_path, tgt_path_2])
+      self.filesystem_client.copy_from_local(old_src_path, tgt_path_2)
 
       # Add the function.
       create_mismatch_fn = (
@@ -227,7 +227,7 @@ class TestCoordinators(CustomClusterTestSuite):
       # Overwrite the jar, giving it a new mtime. The sleep prevents the write 
to
       # happen too quickly so that its within mtime granularity (1 second).
       time.sleep(2)
-      check_call(["hadoop", "fs", "-put", "-f", new_src_path, tgt_path_2])
+      self.filesystem_client.copy_from_local(new_src_path, tgt_path_2)
 
       # Run the query. Expect the query fails due to mismatched libs at the
       # coordinator and one of the executors.
diff --git a/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py 
b/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
index dab3918..35ac898 100644
--- a/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
+++ b/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
@@ -158,8 +158,8 @@ class 
TestHiveParquetTimestampConversion(CustomClusterTestSuite):
        "create table %s.t (i int, d timestamp) stored as parquet" % 
unique_database)
 
     tbl_loc = get_fs_path("/test-warehouse/%s.db/t" % unique_database)
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/data/hive_single_value_timestamp.parq", tbl_loc])
+    self.filesystem_client.copy_from_local(os.environ['IMPALA_HOME'] +
+        "/testdata/data/hive_single_value_timestamp.parq", tbl_loc)
 
     # TODO: other tests in this file could also use query option 'timezone' to 
enable
     #       real data validation
diff --git a/tests/custom_cluster/test_insert_behaviour.py 
b/tests/custom_cluster/test_insert_behaviour.py
index 7d3fb3e..29c3af6 100644
--- a/tests/custom_cluster/test_insert_behaviour.py
+++ b/tests/custom_cluster/test_insert_behaviour.py
@@ -20,7 +20,10 @@ import pytest
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfLocal
 from tests.util.filesystem_utils import IS_ISILON, WAREHOUSE
-from tests.util.hdfs_util import HdfsConfig, get_hdfs_client, 
get_hdfs_client_from_conf
+from tests.util.hdfs_util import (
+    HdfsConfig,
+    get_webhdfs_client,
+    get_webhdfs_client_from_conf)
 
 TEST_TBL = "insert_inherit_permission"
 
@@ -34,10 +37,10 @@ class 
TestInsertBehaviourCustomCluster(CustomClusterTestSuite):
     super(TestInsertBehaviourCustomCluster, cls).setup_class()
     if pytest.config.option.namenode_http_address is None:
       hdfs_conf = HdfsConfig(pytest.config.option.minicluster_xml_conf)
-      cls.hdfs_client = get_hdfs_client_from_conf(hdfs_conf)
+      cls.hdfs_client = get_webhdfs_client_from_conf(hdfs_conf)
     else:
       host, port = pytest.config.option.namenode_http_address.split(":")
-      cls.hdfs_client = get_hdfs_client(host, port)
+      cls.hdfs_client = get_webhdfs_client(host, port)
 
   def _check_partition_perms(self, part, perms):
     ls = self.hdfs_client.get_file_dir_status("test-warehouse/%s/%s" % 
(TEST_TBL, part))
diff --git a/tests/custom_cluster/test_parquet_max_page_header.py 
b/tests/custom_cluster/test_parquet_max_page_header.py
index e1c9e0e..d40e04e 100644
--- a/tests/custom_cluster/test_parquet_max_page_header.py
+++ b/tests/custom_cluster/test_parquet_max_page_header.py
@@ -93,7 +93,7 @@ class TestParquetMaxPageHeader(CustomClusterTestSuite):
         for i in xrange(self.MAX_STRING_LENGTH)])
     random_text2 = "".join([random.choice(string.letters)
         for i in xrange(self.MAX_STRING_LENGTH)])
-    put = subprocess.Popen(["hadoop", "fs", "-put", "-f", "-", file_name],
+    put = subprocess.Popen(["hdfs", "dfs", "-put", "-d", "-f", "-", file_name],
         stdin=subprocess.PIPE, bufsize=-1)
     put.stdin.write(random_text1 + "\n")
     put.stdin.write(random_text2)
diff --git a/tests/custom_cluster/test_udf_concurrency.py 
b/tests/custom_cluster/test_udf_concurrency.py
index dfcd6bb..8643ecf 100644
--- a/tests/custom_cluster/test_udf_concurrency.py
+++ b/tests/custom_cluster/test_udf_concurrency.py
@@ -159,7 +159,7 @@ class TestUdfConcurrency(CustomClusterTestSuite):
 
     # use a unique jar for this test to avoid interactions with other tests
     # that use the same jar
-    check_call(["hadoop", "fs", "-put", "-f", udf_src_path, udf_tgt_path])
+    self.filesystem_client.copy_from_local(udf_src_path, udf_tgt_path)
 
     # create all the functions.
     setup_client = self.create_impala_client()
diff --git a/tests/metadata/test_hidden_files.py 
b/tests/metadata/test_hidden_files.py
index a48531a..8454b83 100644
--- a/tests/metadata/test_hidden_files.py
+++ b/tests/metadata/test_hidden_files.py
@@ -61,35 +61,35 @@ class TestHiddenFiles(ImpalaTestSuite):
     ALLTYPES_LOC = "%s/alltypes" % WAREHOUSE
     TEST_TBL_LOC = "%s/%s.db/%s" % (WAREHOUSE, db_name, tbl_name)
     # Copy a visible file into one of the partitions.
-    check_call(["hadoop", "fs", "-cp",
+    self.filesystem_client.copy(
           "%s/year=2010/month=1/100101.txt" % ALLTYPES_LOC,
-          "%s/year=2010/month=1/100101.txt" % TEST_TBL_LOC], shell=False)
+          "%s/year=2010/month=1/100101.txt" % TEST_TBL_LOC, overwrite=True)
     # Add hidden files to the non-empty partition. Use upper case hidden 
suffixes.
-    check_call(["hadoop", "fs", "-cp",
+    self.filesystem_client.copy(
           "%s/year=2010/month=1/100101.txt" % ALLTYPES_LOC,
-          "%s/year=2010/month=1/.100101.txt" % TEST_TBL_LOC], shell=False)
-    check_call(["hadoop", "fs", "-cp",
+          "%s/year=2010/month=1/.100101.txt" % TEST_TBL_LOC, overwrite=True)
+    self.filesystem_client.copy(
           "%s/year=2010/month=1/100101.txt" % ALLTYPES_LOC,
-          "%s/year=2010/month=1/_100101.txt" % TEST_TBL_LOC], shell=False)
-    check_call(["hadoop", "fs", "-cp",
+          "%s/year=2010/month=1/_100101.txt" % TEST_TBL_LOC, overwrite=True)
+    self.filesystem_client.copy(
           "%s/year=2010/month=1/100101.txt" % ALLTYPES_LOC,
-          "%s/year=2010/month=1/100101.txt.COPYING" % TEST_TBL_LOC], 
shell=False)
-    check_call(["hadoop", "fs", "-cp",
+          "%s/year=2010/month=1/100101.txt.COPYING" % TEST_TBL_LOC, 
overwrite=True)
+    self.filesystem_client.copy(
           "%s/year=2010/month=1/100101.txt" % ALLTYPES_LOC,
-          "%s/year=2010/month=1/100101.txt.TMP" % TEST_TBL_LOC], shell=False)
+          "%s/year=2010/month=1/100101.txt.TMP" % TEST_TBL_LOC, overwrite=True)
     # Add hidden files to the empty partition. Use lower case hidden suffixes.
-    check_call(["hadoop", "fs", "-cp",
+    self.filesystem_client.copy(
           "%s/year=2010/month=2/100201.txt" % ALLTYPES_LOC,
-          "%s/year=2010/month=2/.100201.txt" % TEST_TBL_LOC], shell=False)
-    check_call(["hadoop", "fs", "-cp",
+          "%s/year=2010/month=2/.100201.txt" % TEST_TBL_LOC, overwrite=True)
+    self.filesystem_client.copy(
           "%s/year=2010/month=2/100201.txt" % ALLTYPES_LOC,
-          "%s/year=2010/month=2/_100201.txt" % TEST_TBL_LOC], shell=False)
-    check_call(["hadoop", "fs", "-cp",
+          "%s/year=2010/month=2/_100201.txt" % TEST_TBL_LOC, overwrite=True)
+    self.filesystem_client.copy(
           "%s/year=2010/month=2/100201.txt" % ALLTYPES_LOC,
-          "%s/year=2010/month=2/100201.txt.copying" % TEST_TBL_LOC], 
shell=False)
-    check_call(["hadoop", "fs", "-cp",
+          "%s/year=2010/month=2/100201.txt.copying" % TEST_TBL_LOC, 
overwrite=True)
+    self.filesystem_client.copy(
           "%s/year=2010/month=2/100201.txt" % ALLTYPES_LOC,
-          "%s/year=2010/month=2/100201.txt.tmp" % TEST_TBL_LOC], shell=False)
+          "%s/year=2010/month=2/100201.txt.tmp" % TEST_TBL_LOC, overwrite=True)
 
   def test_hidden_files_load(self, vector, unique_database):
     """Tests that an incremental refresh ignores hidden files."""
diff --git a/tests/metadata/test_refresh_partition.py 
b/tests/metadata/test_refresh_partition.py
index 67901da..58d142b 100644
--- a/tests/metadata/test_refresh_partition.py
+++ b/tests/metadata/test_refresh_partition.py
@@ -213,7 +213,7 @@ class TestRefreshPartition(ImpalaTestSuite):
     result = self.client.execute("select count(*) from %s" % table_name)
     assert result.data == [str(0)]
     dst_path = "%s/year=2010/month=1/%s" % (table_location, file_name)
-    check_call(["hadoop", "fs", "-cp", "-f", src_file, dst_path], shell=False)
+    self.filesystem_client.copy(src_file, dst_path, overwrite=True)
     # Check that data added is not visible before refresh
     result = self.client.execute("select count(*) from %s" % table_name)
     assert result.data == [str(0)]
@@ -250,8 +250,7 @@ class TestRefreshPartition(ImpalaTestSuite):
     assert result.data == [str(0)]
     dst_path = table_location + "/year=2010/month=%s/" + file_name
     for month in [1, 2]:
-        check_call(["hadoop", "fs", "-cp", "-f", src_file, dst_path % month],
-                   shell=False)
+        self.filesystem_client.copy(src_file, dst_path % month, overwrite=True)
     # Check that data added is not visible before refresh
     result = self.client.execute("select count(*) from %s" % table_name)
     assert result.data == [str(0)]
diff --git a/tests/metadata/test_stale_metadata.py 
b/tests/metadata/test_stale_metadata.py
index 9e5bf5f..223b454 100644
--- a/tests/metadata/test_stale_metadata.py
+++ b/tests/metadata/test_stale_metadata.py
@@ -136,4 +136,4 @@ class TestRewrittenFile(ImpalaTestSuite):
   def __copy_file_to_test_table(self, src_path, table_location):
     """Copies the provided path to the test table, overwriting any previous 
file."""
     dst_path = "%s/%s" % (table_location, self.FILE_NAME)
-    check_call(["hadoop", "fs", "-cp", "-f", src_path, dst_path], shell=False)
+    self.filesystem_client.copy(src_path, dst_path, overwrite=True)
diff --git a/tests/query_test/test_compressed_formats.py 
b/tests/query_test/test_compressed_formats.py
index 2748bd4..81320d6 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -71,13 +71,15 @@ class TestCompressedFormats(ImpalaTestSuite):
   def test_compressed_formats(self, vector):
     file_format = vector.get_value('file_format')
     extension, suffix = vector.get_value('compression_format')
-    if file_format in ['rc', 'seq', 'text']:
+    if file_format in ['rc', 'seq']:
       # TODO: How about LZO?
       # Test that {gzip,snappy,bzip,deflate}-compressed
       # {RC,sequence,text} files are supported.
       db_suffix = '_%s_%s' % (file_format, suffix)
       self._copy_and_query_compressed_file(
         'tinytable', db_suffix, suffix, '000000_0', extension)
+    elif file_format is 'text':
+        pytest.xfail('IMPALA-9004: TestCompressedFormats is broken for text 
files')
     else:
       assert False, "Unknown file_format: %s" % file_format
 
@@ -103,7 +105,7 @@ class TestCompressedFormats(ImpalaTestSuite):
 
     # Create the table
     self.run_stmt_in_hive(hive_cmd)
-    call(["hadoop", "fs", "-cp", src_file, dest_file])
+    self.filesystem_client.copy(src_file, dest_file, overwrite=True)
     # Try to read the compressed file with extension
     query = 'select count(*) from %s' % dest_table
     try:
@@ -112,6 +114,7 @@ class TestCompressedFormats(ImpalaTestSuite):
       result = self.execute_scalar(query)
       # Fail iff we expected an error
       assert expected_error is None, 'Query is expected to fail'
+      assert result and int(result) > 0
     except Exception as e:
       error_msg = str(e)
       print error_msg
@@ -207,7 +210,7 @@ class TestLargeCompressedFile(ImpalaTestSuite):
     # Total uncompressed size of a nested structure.
     total_chunk_size = num_blocks_per_chunk * payload_size
 
-    hdfs_put = subprocess.Popen(["hadoop", "fs", "-put", "-f", "-", file_name],
+    hdfs_put = subprocess.Popen(["hdfs", "dfs", "-put", "-d", "-f", "-", 
file_name],
         stdin=subprocess.PIPE, bufsize=-1)
     for i in range(num_chunks):
       hdfs_put.stdin.write(struct.pack('>i', total_chunk_size))
diff --git a/tests/query_test/test_hdfs_file_mods.py 
b/tests/query_test/test_hdfs_file_mods.py
index 30698e1..48e8995 100644
--- a/tests/query_test/test_hdfs_file_mods.py
+++ b/tests/query_test/test_hdfs_file_mods.py
@@ -56,7 +56,8 @@ class TestHdfsFileMods(ImpalaTestSuite):
     # Use HDFS commands to clone the table's files at the hdfs level
     old_table_location = 
"{0}/test-warehouse/tinytable".format(FILESYSTEM_PREFIX)
     call(["hdfs", "dfs", "-mkdir", new_table_location])
-    call(["hdfs", "dfs", "-cp", old_table_location + "/*", new_table_location])
+    self.filesystem_client.copy(old_table_location + "/*", new_table_location,
+        overwrite=True)
 
     # Create an external table with the new files (tinytable has two string 
columns)
     create_table = "create external table {0}.t1 (a string, b string) "\
diff --git a/tests/query_test/test_insert_parquet.py 
b/tests/query_test/test_insert_parquet.py
index f224c73..ac647ed 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -333,8 +333,8 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
     file has the same column type metadata as the generated one."""
     hdfs_path = (os.environ['DEFAULT_FS'] + "/test-warehouse/{0}.db/"
                  
"signed_integer_logical_types.parquet").format(unique_database)
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-                '/testdata/data/signed_integer_logical_types.parquet', 
hdfs_path])
+    self.filesystem_client.copy_from_local(os.environ['IMPALA_HOME'] +
+        '/testdata/data/signed_integer_logical_types.parquet', hdfs_path)
     # Create table with signed integer logical types
     src_tbl = "{0}.{1}".format(unique_database, "read_write_logical_type_src")
     create_tbl_stmt = """create table {0} like parquet "{1}"
diff --git a/tests/query_test/test_multiple_filesystems.py 
b/tests/query_test/test_multiple_filesystems.py
index 2e97821..91abe53 100644
--- a/tests/query_test/test_multiple_filesystems.py
+++ b/tests/query_test/test_multiple_filesystems.py
@@ -49,10 +49,10 @@ class TestMultipleFilesystems(ImpalaTestSuite):
     call(["hadoop", "fs", "-mkdir", 
get_secondary_fs_path("/multi_fs_tests/")], shell=False)
     check_call(["hadoop", "fs", "-mkdir",
                 get_secondary_fs_path("/multi_fs_tests/%s.db/" % db_name)], 
shell=False)
-    check_call(["hadoop", "fs", "-cp", "/test-warehouse/alltypes_parquet/",
-                get_secondary_fs_path("/multi_fs_tests/%s.db/" % db_name)], 
shell=False)
-    check_call(["hadoop", "fs", "-cp", "/test-warehouse/tinytable/",
-                get_secondary_fs_path("/multi_fs_tests/%s.db/" % db_name)], 
shell=False)
+    self.filesystem_client.copy("/test-warehouse/alltypes_parquet/",
+        get_secondary_fs_path("/multi_fs_tests/%s.db/" % db_name), 
overwrite=True)
+    self.filesystem_client.copy("/test-warehouse/tinytable/", 
get_secondary_fs_path(
+        "/multi_fs_tests/%s.db/" % db_name), overwrite=True)
 
   @pytest.mark.execute_serially
   def test_multiple_filesystems(self, vector, unique_database):
diff --git a/tests/query_test/test_nested_types.py 
b/tests/query_test/test_nested_types.py
index 779af4a..d1e2719 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -552,7 +552,6 @@ class TestParquetArrayEncodings(ImpalaTestSuite):
   # ...f22 = 220
   # ..F11 = 110
   # ..F12 = 120
-  @SkipIfS3.eventually_consistent
   def test_ambiguous_list(self, vector, unique_database):
     """IMPALA-4725: Tests the schema-resolution behavior with different values 
for the
     PARQUET_ARRAY_RESOLUTION and PARQUET_FALLBACK_SCHEMA_RESOLUTION query 
options.
@@ -589,7 +588,7 @@ class TestParquetArrayEncodings(ImpalaTestSuite):
     self.client.execute("create table %s.%s (%s) stored as parquet location 
'%s'" %
                         (dbname, tablename, columns, location))
     local_path = self.TESTFILE_DIR + "/" + filename
-    check_call(["hadoop", "fs", "-put", local_path, location], shell=False)
+    self.filesystem_client.copy_from_local(local_path, location)
 
 class TestMaxNestingDepth(ImpalaTestSuite):
   # Should be kept in sync with the FE's Type.MAX_NESTING_DEPTH
@@ -623,9 +622,9 @@ class TestMaxNestingDepth(ImpalaTestSuite):
   def __create_parquet_tables(self, unique_database, as_target=True):
     """Create Parquet tables from files. If 'as_target' is False, the Parquet 
tables will
      be used to create ORC tables, so we add a suffix in the table names."""
-    check_call(["hdfs", "dfs", "-copyFromLocal",
+    self.filesystem_client.copy_from_local(
       "%s/testdata/max_nesting_depth" % os.environ['IMPALA_HOME'],
-      "%s/%s.db/" % (WAREHOUSE, unique_database)], shell=False)
+      "%s/%s.db/" % (WAREHOUSE, unique_database))
     tbl_suffix = '' if as_target else self.TEMP_TABLE_SUFFIX
     for tbl in self.TABLES:
       tbl_name = "%s.%s_tbl%s" % (unique_database, tbl, tbl_suffix)
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index 198ae4c..5b17ecb 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -714,7 +714,6 @@ class TestParquet(ImpalaTestSuite):
     assert c_schema_elt.converted_type == ConvertedType.UTF8
     assert d_schema_elt.converted_type == None
 
-  @SkipIfS3.eventually_consistent
   def test_resolution_by_name(self, vector, unique_database):
     self.run_test_case('QueryTest/parquet-resolution-by-name', vector,
                        use_db=unique_database)
@@ -1117,7 +1116,7 @@ class TestTextSplitDelimiters(ImpalaTestSuite):
     with tempfile.NamedTemporaryFile() as f:
       f.write(data)
       f.flush()
-      check_call(['hadoop', 'fs', '-copyFromLocal', f.name, location])
+      self.filesystem_client.copy_from_local(f.name, location)
     self.client.execute("refresh %s" % qualified_table_name);
 
     vector.get_value('exec_option')['max_scan_range_length'] = 
max_scan_range_length
@@ -1270,7 +1269,7 @@ class TestOrc(ImpalaTestSuite):
     tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (db, tbl))
     # set block size to 156672 so lineitem_threeblocks.orc occupies 3 blocks,
     # lineitem_sixblocks.orc occupies 6 blocks.
-    check_call(['hdfs', 'dfs', '-Ddfs.block.size=156672', '-copyFromLocal',
+    check_call(['hdfs', 'dfs', '-Ddfs.block.size=156672', '-copyFromLocal', 
'-d', '-f',
         os.environ['IMPALA_HOME'] + "/testdata/LineItemMultiBlock/" + file, 
tbl_loc])
 
   def _misaligned_orc_stripes_helper(
diff --git a/tests/query_test/test_scanners_fuzz.py 
b/tests/query_test/test_scanners_fuzz.py
index a7a578c..1a505ca 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -176,7 +176,7 @@ class TestScannersFuzzing(ImpalaTestSuite):
     # Copy all of the local files and directories to hdfs.
     to_copy = ["%s/%s" % (tmp_table_dir, file_or_dir)
                for file_or_dir in os.listdir(tmp_table_dir)]
-    check_call(['hdfs', 'dfs', '-copyFromLocal', '-d'] + to_copy + 
[fuzz_table_location])
+    self.filesystem_client.copy_from_local(to_copy, fuzz_table_location)
 
     if "SCANNER_FUZZ_KEEP_FILES" not in os.environ:
       shutil.rmtree(tmp_table_dir)
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 3839044..8572db8 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -338,7 +338,7 @@ class TestUdfExecution(TestUdfBase):
     try:
       with open(dir_name, "w") as f:
         f.write("Hello World")
-      check_call(["hadoop", "fs", "-put", "-f", f.name, hdfs_path])
+      self.filesystem_client.copy_from_local(f.name, hdfs_path)
       if vector.get_value('exec_option')['disable_codegen']:
         self.run_test_case('QueryTest/udf-errors', vector, 
use_db=unique_database)
     finally:
@@ -436,7 +436,7 @@ class TestUdfTargeted(TestUdfBase):
             unique_database, tgt_udf_path))
     query = "select `{0}`.fn_invalid_symbol('test')".format(unique_database)
 
-    check_call(["hadoop", "fs", "-put", "-f", src_udf_path, tgt_udf_path])
+    self.filesystem_client.copy_from_local(src_udf_path, tgt_udf_path)
     self.client.execute(drop_fn_stmt)
     self.client.execute(create_fn_stmt)
     for _ in xrange(5):
@@ -471,7 +471,7 @@ class TestUdfTargeted(TestUdfBase):
     jar_path = get_fs_path("/test-warehouse/{0}.db/".format(unique_database)
                            + get_random_id(5) + ".jar")
     hive_jar = get_fs_path("/test-warehouse/hive-exec.jar")
-    check_call(["hadoop", "fs", "-cp", hive_jar, jar_path])
+    self.filesystem_client.copy(hive_jar, jar_path)
     drop_fn_stmt = (
         "drop function if exists "
         "`{0}`.`pi_missing_jar`()".format(unique_database))
@@ -528,14 +528,14 @@ class TestUdfTargeted(TestUdfBase):
     query_stmt = "select 
`{0}`.`udf_update_test_drop`()".format(unique_database)
 
     # Put the old UDF binary on HDFS, make the UDF in Impala and run it.
-    check_call(["hadoop", "fs", "-put", "-f", old_udf, udf_dst])
+    self.filesystem_client.copy_from_local(old_udf, udf_dst)
     self.execute_query_expect_success(self.client, drop_fn_stmt, exec_options)
     self.execute_query_expect_success(self.client, create_fn_stmt, 
exec_options)
     self._run_query_all_impalads(exec_options, query_stmt, ["Old UDF"])
 
     # Update the binary, drop and create the function again. The new binary 
should
     # be running.
-    check_call(["hadoop", "fs", "-put", "-f", new_udf, udf_dst])
+    self.filesystem_client.copy_from_local(new_udf, udf_dst)
     self.execute_query_expect_success(self.client, drop_fn_stmt, exec_options)
     self.execute_query_expect_success(self.client, create_fn_stmt, 
exec_options)
     self._run_query_all_impalads(exec_options, query_stmt, ["New UDF"])
@@ -568,7 +568,7 @@ class TestUdfTargeted(TestUdfBase):
     query_template = "select `{0}`.`{{0}}`()".format(unique_database)
 
     # Put the old UDF binary on HDFS, make the UDF in Impala and run it.
-    check_call(["hadoop", "fs", "-put", "-f", old_udf, udf_dst])
+    self.filesystem_client.copy_from_local(old_udf, udf_dst)
     self.execute_query_expect_success(
         self.client, create_fn_template.format(old_function_name), 
exec_options)
     self._run_query_all_impalads(
@@ -576,7 +576,7 @@ class TestUdfTargeted(TestUdfBase):
 
     # Update the binary, and create a new function using the binary. The new 
binary
     # should be running.
-    check_call(["hadoop", "fs", "-put", "-f", new_udf, udf_dst])
+    self.filesystem_client.copy_from_local(new_udf, udf_dst)
     self.execute_query_expect_success(
         self.client, create_fn_template.format(new_function_name), 
exec_options)
     self._run_query_all_impalads(
diff --git a/tests/util/adls_util.py b/tests/util/adls_util.py
index b72b4c1..3b9ea0f 100644
--- a/tests/util/adls_util.py
+++ b/tests/util/adls_util.py
@@ -23,6 +23,7 @@
 from azure.datalake.store import core, lib, multithread, exceptions
 from tests.util.filesystem_base import BaseFilesystem
 from tests.util.filesystem_utils import ADLS_CLIENT_ID, ADLS_TENANT_ID, 
ADLS_CLIENT_SECRET
+from tests.util.hdfs_util import HadoopFsCommandLineClient
 
 class ADLSClient(BaseFilesystem):
 
@@ -31,6 +32,7 @@ class ADLSClient(BaseFilesystem):
                           client_secret = ADLS_CLIENT_SECRET,
                           client_id = ADLS_CLIENT_ID)
     self.adlsclient = core.AzureDLFileSystem(self.token, store_name=store)
+    self.adls_cli_client = HadoopFsCommandLineClient("ADLS")
 
   def create_file(self, path, file_data, overwrite=True):
     if not overwrite and self.exists(path): return False
@@ -43,13 +45,11 @@ class ADLSClient(BaseFilesystem):
     self.adlsclient.mkdir(path)
     return True
 
-  def copy(self, src, dst):
-    # The ADLS Python client doesn't support cp() yet, so we have to download 
and
-    # reupload to the destination.
-    src_contents = self.adlsclient.cat(src)
-    self.create_file(dst, src_contents, overwrite=True)
-    assert self.exists(dst), \
-        'ADLS copy failed: Destination file {dst} does not 
exist'.format(dst=dst)
+  def copy(self, src, dst, overwrite=True):
+    self.adls_cli_client.copy(src, dst, overwrite)
+
+  def copy_from_local(self, src, dst):
+    self.adls_cli_client.copy_from_local(src, dst)
 
   def ls(self, path):
     file_paths = self.adlsclient.ls(path)
diff --git a/tests/util/filesystem_base.py b/tests/util/filesystem_base.py
index 9396a2e..f744530 100644
--- a/tests/util/filesystem_base.py
+++ b/tests/util/filesystem_base.py
@@ -36,11 +36,18 @@ class BaseFilesystem(object):
     pass
 
   @abstractmethod
-  def copy(self, src, dst):
+  def copy(self, src, dst, overwrite):
     """Copy a file from 'src' to 'dst'. Throws an exception if unsuccessful."""
     pass
 
   @abstractmethod
+  def copy_from_local(self, src, dst):
+    """Copies a file from 'src' file on the local filesystem to the 'dst', 
which can be
+    on any HDFS compatible filesystem. Fails if the src file is not on the 
local
+    filesystem. Throws an exception if unsuccessful."""
+    pass
+
+  @abstractmethod
   def ls(self, path):
     """Return a list of all files/dirs/keys in path. Throws an exception if 
path
     is invalid."""
diff --git a/tests/util/hdfs_util.py b/tests/util/hdfs_util.py
index f015f1f..b22dec4 100644
--- a/tests/util/hdfs_util.py
+++ b/tests/util/hdfs_util.py
@@ -55,7 +55,61 @@ CORE_CONF = 
HdfsConfig(os.path.join(environ['HADOOP_CONF_DIR'], "core-site.xml")
 NAMENODE = FILESYSTEM_PREFIX or CORE_CONF.get('fs.defaultFS')
 
 
-class PyWebHdfsClientWithChmod(PyWebHdfsClient, BaseFilesystem):
+class DelegatingHdfsClient(BaseFilesystem):
+  """HDFS client that either delegates to a PyWebHdfsClientWithChmod or a
+  HadoopFsCommandLineClient. Since PyWebHdfsClientWithChmod does not have good 
support
+  for copying files, this class delegates to HadoopFsCommandLineClient for all 
copy
+  operations."""
+
+  def __init__(self, webhdfs_client, hdfs_filesystem_client):
+    self.webhdfs_client = webhdfs_client
+    self.hdfs_filesystem_client = hdfs_filesystem_client
+    super(DelegatingHdfsClient, self).__init__()
+
+  def create_file(self, path, file_data, overwrite=True):
+    return self.webhdfs_client.create_file(path, file_data, 
overwrite=overwrite)
+
+  def make_dir(self, path, permission=None):
+    if permission:
+      return self.webhdfs_client.make_dir(path, permission=permission)
+    else:
+      return self.webhdfs_client.make_dir(path)
+
+  def copy(self, src, dst, overwrite=False):
+    self.hdfs_filesystem_client.copy(src, dst, overwrite)
+
+  def copy_from_local(self, src, dst):
+    self.hdfs_filesystem_client.copy_from_local(src, dst)
+
+  def ls(self, path):
+    return self.webhdfs_client.ls(path)
+
+  def exists(self, path):
+    return self.webhdfs_client.exists(path)
+
+  def delete_file_dir(self, path, recursive=False):
+    return self.webhdfs_client.delete_file_dir(path, recursive=recursive)
+
+  def get_file_dir_status(self, path):
+    return self.webhdfs_client.get_file_dir_status(path)
+
+  def get_all_file_sizes(self, path):
+    return self.webhdfs_client.get_all_file_sizes(path)
+
+  def chmod(self, path, permission):
+    return self.webhdfs_client.chmod(path, permission)
+
+  def chown(self, path, user, group):
+    return self.webhdfs_client.chown(path, user, group)
+
+  def setacl(self, path, acls):
+    return self.webhdfs_client.setacl(path, acls)
+
+  def getacl(self, path):
+    return self.webhdfs_client.getacl(path)
+
+
+class PyWebHdfsClientWithChmod(PyWebHdfsClient):
   def chmod(self, path, permission):
     """Set the permission of 'path' to 'permission' (specified as an octal 
string, e.g.
     '775'"""
@@ -115,22 +169,6 @@ class PyWebHdfsClientWithChmod(PyWebHdfsClient, 
BaseFilesystem):
         sizes += [status['length']]
     return sizes
 
-  def copy(self, src, dest):
-    """Copies a file in hdfs from src to destination
-
-    Simulates hdfs dfs (or hadoop fs) -cp <src> <dst>. Does not resolve all 
the files if
-    the source or destination is a directory. Files need to be explicitly 
copied.
-    TODO: Infer whether the source or destination is a directory and do this 
implicitly.
-    TODO: Take care of larger files by always reading/writing them in small 
chunks.
-    """
-    assert self.get_file_dir_status(src)
-    # Get the data
-    data = self.read_file(src)
-    # Copy the data
-    self.create_file(dest, data)
-    assert self.get_file_dir_status(dest)
-    assert self.read_file(dest) == data
-
   def ls(self, path):
     """Returns a list of all file and directory names in 'path'"""
     # list_dir() returns a dictionary of file statues. This function picks out 
the
@@ -161,8 +199,8 @@ class HadoopFsCommandLineClient(BaseFilesystem):
     super(HadoopFsCommandLineClient, self).__init__()
 
   def _hadoop_fs_shell(self, command):
-    """Helper function wrapper around 'hadoop fs' takes in the arguments as a 
list."""
-    hadoop_command = ['hadoop', 'fs'] + command
+    """Helper function wrapper around 'hdfs dfs' takes in the arguments as a 
list."""
+    hadoop_command = ['hdfs', 'dfs'] + command
     process = subprocess.Popen(hadoop_command,
           stdout=subprocess.PIPE, stderr=subprocess.PIPE)
     stdout, stderr = process.communicate()
@@ -176,8 +214,10 @@ class HadoopFsCommandLineClient(BaseFilesystem):
     if not overwrite and self.exists(fixed_path): return False
     with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
       tmp_file.write(file_data)
-    (status, stdout, stderr) = \
-        self._hadoop_fs_shell(['-put', tmp_file.name, fixed_path])
+    put_cmd_params = ['-put', '-d']
+    if overwrite: put_cmd_params.append('-f')
+    put_cmd_params.extend([tmp_file.name, fixed_path])
+    (status, stdout, stderr) = self._hadoop_fs_shell(put_cmd_params)
     return status == 0
 
   def make_dir(self, path, permission=None):
@@ -186,18 +226,35 @@ class HadoopFsCommandLineClient(BaseFilesystem):
     (status, stdout, stderr) = self._hadoop_fs_shell(['-mkdir', '-p', 
fixed_path])
     return status == 0
 
-  def copy(self, src, dst):
-    """Copy the source file to the destination."""
+  def copy(self, src, dst, overwrite):
+    """Copy the source file to the destination. Specifes the '-d' option by 
default, which
+    'Skip[s] creation of temporary file with the suffix ._COPYING_.' to avoid 
extraneous
+    copies on S3. If overwrite is true, the destination file is overwritten, 
set to false
+    by default for backwards compatibility."""
     fixed_src = self._normalize_path(src)
     fixed_dst = self._normalize_path(dst)
-    (status, stdout, stderr) = \
-        self._hadoop_fs_shell(['-cp', fixed_src, fixed_dst])
+    cp_cmd_params = ['-cp', '-d']
+    if overwrite: cp_cmd_params.append('-f')
+    cp_cmd_params.extend([fixed_src, fixed_dst])
+    (status, stdout, stderr) = self._hadoop_fs_shell(cp_cmd_params)
     assert status == 0, \
         '{0} copy failed: '.format(self.filesystem_type) + stderr + "; " + 
stdout
     assert self.exists(dst), \
         '{fs_type} copy failed: Destination file {dst} does not exist'\
             .format(fs_type=self.filesystem_type, dst=dst)
 
+  def copy_from_local(self, src, dst):
+    """Wrapper around 'hdfs dfs -copyFromLocal [-f] [-p] [-l] [-d] <localsrc> 
... <dst>'.
+    Overwrites files by default to avoid S3 consistency issues. Specifes the 
'-d' option
+    by default, which 'Skip[s] creation of temporary file with the suffix 
._COPYING_.' to
+    avoid extraneous copies on S3. 'src' must be either a string or a list of 
strings."""
+    assert isinstance(src, list) or isinstance(src, basestring)
+    src_list = src if isinstance(src, list) else [src]
+    (status, stdout, stderr) = self._hadoop_fs_shell(['-copyFromLocal', '-d', 
'-f'] +
+        src_list + [dst])
+    assert status == 0, '{0} copy from {1} to {2} failed: 
'.format(self.filesystem_type,
+        src, dst) + stderr + '; ' + stdout
+
   def _inner_ls(self, path):
     """List names, lengths, and mode for files/directories under the specified 
path."""
     fixed_path = self._normalize_path(path)
@@ -253,16 +310,16 @@ class HadoopFsCommandLineClient(BaseFilesystem):
     return path if path.startswith('/') else '/' + path
 
 
-def get_hdfs_client_from_conf(conf):
+def get_webhdfs_client_from_conf(conf):
   """Returns a new HTTP client for an HDFS cluster using an HdfsConfig 
object"""
   hostport = conf.get('dfs.namenode.http-address')
   if hostport is None:
     raise Exception("dfs.namenode.http-address not found in config")
   host, port = hostport.split(":")
-  return get_hdfs_client(host=host, port=port)
+  return get_webhdfs_client(host=host, port=port)
 
 
-def get_hdfs_client(host, port, user_name=getpass.getuser()):
+def get_webhdfs_client(host, port, user_name=getpass.getuser()):
   """Returns a new HTTP client for an HDFS cluster using an explict host:port 
pair"""
   return PyWebHdfsClientWithChmod(host=host, port=port, user_name=user_name)
 
@@ -271,7 +328,3 @@ def get_default_hdfs_config():
   core_site_path = os.path.join(environ.get('HADOOP_CONF_DIR'), 
'core-site.xml')
   hdfs_site_path = os.path.join(environ.get('HADOOP_CONF_DIR'), 
'hdfs-site.xml')
   return HdfsConfig(core_site_path, hdfs_site_path)
-
-
-def create_default_hdfs_client():
-  return get_hdfs_client_from_conf(get_default_hdfs_config())

Reply via email to