This is an automated email from the ASF dual-hosted git repository.
stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new ac87278 IMPALA-8950: Add -d, -f options to hdfs copyFromLocal, put, cp
ac87278 is described below
commit ac87278b169422091af1c03fcd2101516372defb
Author: Sahil Takiar <[email protected]>
AuthorDate: Wed Sep 18 15:22:07 2019 -0700
IMPALA-8950: Add -d, -f options to hdfs copyFromLocal, put, cp
Add the -d option and -f option to the following commands:
`hdfs dfs -copyFromLocal <localsrc> URI`
`hdfs dfs -put [ - | <localsrc1> .. ]. <dst>`
`hdfs dfs -cp URI [URI ...] <dest>`
The -d option "Skip[s] creation of temporary file with the suffix
._COPYING_." which improves performance of these commands on S3 since S3
does not support metadata only renames.
The -f option "Overwrites the destination if it already exists" combined
with HADOOP-13884 this improves issues seen with S3 consistency issues by
avoiding a HEAD request to check if the destination file exists or not.
Added the method 'copy_from_local' to the BaseFilesystem class.
Re-factored most usages of the aforementioned HDFS commands to use
the filesystem_client. Some usages were not appropriate / worth
refactoring, so occasionally this patch just adds the '-d' and '-f'
options explicitly. All calls to '-put' were replaced with
'copyFromLocal' because they both copy files from the local fs to a HDFS
compatible target fs.
Since WebHDFS does not have good support for copying files, this patch
removes the copy functionality from the PyWebHdfsClientWithChmod.
Re-factored the hdfs_client so that it uses a DelegatingHdfsClient
that delegates to either the HadoopFsCommandLineClient or
PyWebHdfsClientWithChmod.
Testing:
* Ran core tests on HDFS and S3
Change-Id: I0d45db1c00554e6fb6bcc0b552596d86d4e30144
Reviewed-on: http://gerrit.cloudera.org:8080/14311
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../QueryTest/parquet-resolution-by-name.test | 6 +-
tests/common/impala_test_suite.py | 50 +++++++--
tests/custom_cluster/test_coordinators.py | 8 +-
.../test_hive_parquet_timestamp_conversion.py | 4 +-
tests/custom_cluster/test_insert_behaviour.py | 9 +-
.../custom_cluster/test_parquet_max_page_header.py | 2 +-
tests/custom_cluster/test_udf_concurrency.py | 2 +-
tests/metadata/test_hidden_files.py | 36 +++----
tests/metadata/test_refresh_partition.py | 5 +-
tests/metadata/test_stale_metadata.py | 2 +-
tests/query_test/test_compressed_formats.py | 9 +-
tests/query_test/test_hdfs_file_mods.py | 3 +-
tests/query_test/test_insert_parquet.py | 4 +-
tests/query_test/test_multiple_filesystems.py | 8 +-
tests/query_test/test_nested_types.py | 7 +-
tests/query_test/test_scanners.py | 5 +-
tests/query_test/test_scanners_fuzz.py | 2 +-
tests/query_test/test_udfs.py | 14 +--
tests/util/adls_util.py | 14 +--
tests/util/filesystem_base.py | 9 +-
tests/util/hdfs_util.py | 117 +++++++++++++++------
21 files changed, 206 insertions(+), 110 deletions(-)
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/parquet-resolution-by-name.test
b/testdata/workloads/functional-query/queries/QueryTest/parquet-resolution-by-name.test
index bda36ed..d618932 100644
---
a/testdata/workloads/functional-query/queries/QueryTest/parquet-resolution-by-name.test
+++
b/testdata/workloads/functional-query/queries/QueryTest/parquet-resolution-by-name.test
@@ -72,9 +72,9 @@ string,string
create table nested_resolution_by_name_test like
functional_parquet.complextypestbl;
====
---- SHELL
-hadoop fs -cp
$FILESYSTEM_PREFIX/test-warehouse/complextypestbl_parquet/nullable.parq \
+hdfs dfs -cp -d -f
$FILESYSTEM_PREFIX/test-warehouse/complextypestbl_parquet/nullable.parq \
$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/nested_resolution_by_name_test/
-hadoop fs -cp
$FILESYSTEM_PREFIX/test-warehouse/complextypestbl_parquet/nonnullable.parq \
+hdfs dfs -cp -d -f
$FILESYSTEM_PREFIX/test-warehouse/complextypestbl_parquet/nonnullable.parq \
$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/nested_resolution_by_name_test/
====
---- QUERY
@@ -191,7 +191,7 @@ create table switched_map_fields_resolution_test (int_map
map<string,int>)
stored as parquet;
====
---- SHELL
-hadoop fs -copyFromLocal \
+hdfs dfs -copyFromLocal -d -f \
$IMPALA_HOME/testdata/parquet_schema_resolution/switched_map.parq \
$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/switched_map_fields_resolution_test/
====
diff --git a/tests/common/impala_test_suite.py
b/tests/common/impala_test_suite.py
index d23901e..645d30b 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -63,6 +63,7 @@ from tests.util.filesystem_utils import (
IS_S3,
IS_ABFS,
IS_ADLS,
+ IS_HDFS,
S3_BUCKET_NAME,
S3GUARD_ENABLED,
ADLS_STORE_NAME,
@@ -71,9 +72,10 @@ from tests.util.filesystem_utils import (
from tests.util.hdfs_util import (
HdfsConfig,
- get_hdfs_client,
- get_hdfs_client_from_conf,
+ get_webhdfs_client,
+ get_webhdfs_client_from_conf,
NAMENODE,
+ DelegatingHdfsClient,
HadoopFsCommandLineClient)
from tests.util.test_file_parser import (
QueryTestSectionReader,
@@ -173,11 +175,41 @@ class ImpalaTestSuite(BaseTestSuite):
# Default query options are populated on demand.
cls.default_query_options = {}
-
cls.impalad_test_service = cls.create_impala_service()
- cls.hdfs_client = cls.create_hdfs_client()
- cls.filesystem_client = cls.hdfs_client
- if IS_S3:
+
+ # There are multiple clients for interacting with the underlying storage
service.
+ #
+ # There are two main types of clients: filesystem-specific clients and CLI
clients.
+ # CLI clients all use the 'hdfs dfs' CLI to execute operations against a
target
+ # filesystem.
+ #
+ # 'filesystem_client' is a generic interface for doing filesystem
operations that
+ # works across all the filesystems that Impala supports.
'filesystem_client' uses
+ # either the HDFS command line (e.g. 'hdfs dfs'), a filesystem-specific
library, or
+ # a wrapper around both, to implement common HDFS operations.
+ #
+ # *Test writers should always use 'filesystem_client' unless they are
using filesystem
+ # specific functionality (e.g. HDFS ACLs).*
+ #
+ # The implementation of 'filesystem_client' for each filesystem is:
+ # HDFS: uses a mixture of pywebhdfs (which is faster than the HDFS
CLI) and the
+ # HDFS CLI
+ # S3: uses the HDFS CLI
+ # ABFS: uses the HDFS CLI
+ # ADLS: uses a mixture of azure-data-lake-store-python and the HDFS
CLI (TODO:
+ # this should completely switch to the HDFS CLI once we test it)
+ #
+ # 'hdfs_client' is a HDFS-specific client library, and it only works when
running on
+ # HDFS. When using 'hdfs_client', the test must be skipped on everything
other than
+ # HDFS. This is only really useful for tests that do HDFS ACL operations.
The
+ # 'hdfs_client' supports all the methods and functionality of the
'filesystem_client',
+ # with additional support for ACL operations such as chmod, chown, getacl,
and setacl.
+ # 'hdfs_client' is set to None on non-HDFS systems.
+
+ if IS_HDFS:
+ cls.hdfs_client = cls.create_hdfs_client()
+ cls.filesystem_client = cls.hdfs_client
+ elif IS_S3:
# S3Guard needs filesystem operations to go through the s3 connector.
Use the
# HDFS command line client.
cls.filesystem_client = HadoopFsCommandLineClient("S3")
@@ -253,11 +285,11 @@ class ImpalaTestSuite(BaseTestSuite):
@classmethod
def create_hdfs_client(cls):
if pytest.config.option.namenode_http_address is None:
- hdfs_client = get_hdfs_client_from_conf(HDFS_CONF)
+ webhdfs_client = get_webhdfs_client_from_conf(HDFS_CONF)
else:
host, port = pytest.config.option.namenode_http_address.split(":")
- hdfs_client = get_hdfs_client(host, port)
- return hdfs_client
+ webhdfs_client = get_webhdfs_client(host, port)
+ return DelegatingHdfsClient(webhdfs_client, HadoopFsCommandLineClient())
@classmethod
def all_db_names(cls):
diff --git a/tests/custom_cluster/test_coordinators.py
b/tests/custom_cluster/test_coordinators.py
index 997e7ed..0bf5164 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -149,7 +149,7 @@ class TestCoordinators(CustomClusterTestSuite):
# copy jar with TestUpdateUdf (old) to tmp.jar
check_call(["hadoop", "fs", "-mkdir", "-p", tgt_dir])
- check_call(["hadoop", "fs", "-put", "-f", old_src_path, tgt_path])
+ self.filesystem_client.copy_from_local(old_src_path, tgt_path)
coordinator = self.cluster.impalads[0]
try:
@@ -173,7 +173,7 @@ class TestCoordinators(CustomClusterTestSuite):
assert result.data == ['7300']
# copy a new jar with TestUpdateUdf (new) and NewReplaceStringUdf to
tmp.jar.
- check_call(["hadoop", "fs", "-put", "-f", new_src_path, tgt_path])
+ self.filesystem_client.copy_from_local(new_src_path, tgt_path)
# create a function for the updated TestUpdateUdf.
create_new_fn = (
@@ -210,7 +210,7 @@ class TestCoordinators(CustomClusterTestSuite):
# Copy jar to a new path.
tgt_path_2 = tgt_dir + "/tmp2.jar"
- check_call(["hadoop", "fs", "-put", "-f", old_src_path, tgt_path_2])
+ self.filesystem_client.copy_from_local(old_src_path, tgt_path_2)
# Add the function.
create_mismatch_fn = (
@@ -227,7 +227,7 @@ class TestCoordinators(CustomClusterTestSuite):
# Overwrite the jar, giving it a new mtime. The sleep prevents the write
to
# happen too quickly so that its within mtime granularity (1 second).
time.sleep(2)
- check_call(["hadoop", "fs", "-put", "-f", new_src_path, tgt_path_2])
+ self.filesystem_client.copy_from_local(new_src_path, tgt_path_2)
# Run the query. Expect the query fails due to mismatched libs at the
# coordinator and one of the executors.
diff --git a/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
b/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
index dab3918..35ac898 100644
--- a/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
+++ b/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
@@ -158,8 +158,8 @@ class
TestHiveParquetTimestampConversion(CustomClusterTestSuite):
"create table %s.t (i int, d timestamp) stored as parquet" %
unique_database)
tbl_loc = get_fs_path("/test-warehouse/%s.db/t" % unique_database)
- check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
- "/testdata/data/hive_single_value_timestamp.parq", tbl_loc])
+ self.filesystem_client.copy_from_local(os.environ['IMPALA_HOME'] +
+ "/testdata/data/hive_single_value_timestamp.parq", tbl_loc)
# TODO: other tests in this file could also use query option 'timezone' to
enable
# real data validation
diff --git a/tests/custom_cluster/test_insert_behaviour.py
b/tests/custom_cluster/test_insert_behaviour.py
index 7d3fb3e..29c3af6 100644
--- a/tests/custom_cluster/test_insert_behaviour.py
+++ b/tests/custom_cluster/test_insert_behaviour.py
@@ -20,7 +20,10 @@ import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfLocal
from tests.util.filesystem_utils import IS_ISILON, WAREHOUSE
-from tests.util.hdfs_util import HdfsConfig, get_hdfs_client,
get_hdfs_client_from_conf
+from tests.util.hdfs_util import (
+ HdfsConfig,
+ get_webhdfs_client,
+ get_webhdfs_client_from_conf)
TEST_TBL = "insert_inherit_permission"
@@ -34,10 +37,10 @@ class
TestInsertBehaviourCustomCluster(CustomClusterTestSuite):
super(TestInsertBehaviourCustomCluster, cls).setup_class()
if pytest.config.option.namenode_http_address is None:
hdfs_conf = HdfsConfig(pytest.config.option.minicluster_xml_conf)
- cls.hdfs_client = get_hdfs_client_from_conf(hdfs_conf)
+ cls.hdfs_client = get_webhdfs_client_from_conf(hdfs_conf)
else:
host, port = pytest.config.option.namenode_http_address.split(":")
- cls.hdfs_client = get_hdfs_client(host, port)
+ cls.hdfs_client = get_webhdfs_client(host, port)
def _check_partition_perms(self, part, perms):
ls = self.hdfs_client.get_file_dir_status("test-warehouse/%s/%s" %
(TEST_TBL, part))
diff --git a/tests/custom_cluster/test_parquet_max_page_header.py
b/tests/custom_cluster/test_parquet_max_page_header.py
index e1c9e0e..d40e04e 100644
--- a/tests/custom_cluster/test_parquet_max_page_header.py
+++ b/tests/custom_cluster/test_parquet_max_page_header.py
@@ -93,7 +93,7 @@ class TestParquetMaxPageHeader(CustomClusterTestSuite):
for i in xrange(self.MAX_STRING_LENGTH)])
random_text2 = "".join([random.choice(string.letters)
for i in xrange(self.MAX_STRING_LENGTH)])
- put = subprocess.Popen(["hadoop", "fs", "-put", "-f", "-", file_name],
+ put = subprocess.Popen(["hdfs", "dfs", "-put", "-d", "-f", "-", file_name],
stdin=subprocess.PIPE, bufsize=-1)
put.stdin.write(random_text1 + "\n")
put.stdin.write(random_text2)
diff --git a/tests/custom_cluster/test_udf_concurrency.py
b/tests/custom_cluster/test_udf_concurrency.py
index dfcd6bb..8643ecf 100644
--- a/tests/custom_cluster/test_udf_concurrency.py
+++ b/tests/custom_cluster/test_udf_concurrency.py
@@ -159,7 +159,7 @@ class TestUdfConcurrency(CustomClusterTestSuite):
# use a unique jar for this test to avoid interactions with other tests
# that use the same jar
- check_call(["hadoop", "fs", "-put", "-f", udf_src_path, udf_tgt_path])
+ self.filesystem_client.copy_from_local(udf_src_path, udf_tgt_path)
# create all the functions.
setup_client = self.create_impala_client()
diff --git a/tests/metadata/test_hidden_files.py
b/tests/metadata/test_hidden_files.py
index a48531a..8454b83 100644
--- a/tests/metadata/test_hidden_files.py
+++ b/tests/metadata/test_hidden_files.py
@@ -61,35 +61,35 @@ class TestHiddenFiles(ImpalaTestSuite):
ALLTYPES_LOC = "%s/alltypes" % WAREHOUSE
TEST_TBL_LOC = "%s/%s.db/%s" % (WAREHOUSE, db_name, tbl_name)
# Copy a visible file into one of the partitions.
- check_call(["hadoop", "fs", "-cp",
+ self.filesystem_client.copy(
"%s/year=2010/month=1/100101.txt" % ALLTYPES_LOC,
- "%s/year=2010/month=1/100101.txt" % TEST_TBL_LOC], shell=False)
+ "%s/year=2010/month=1/100101.txt" % TEST_TBL_LOC, overwrite=True)
# Add hidden files to the non-empty partition. Use upper case hidden
suffixes.
- check_call(["hadoop", "fs", "-cp",
+ self.filesystem_client.copy(
"%s/year=2010/month=1/100101.txt" % ALLTYPES_LOC,
- "%s/year=2010/month=1/.100101.txt" % TEST_TBL_LOC], shell=False)
- check_call(["hadoop", "fs", "-cp",
+ "%s/year=2010/month=1/.100101.txt" % TEST_TBL_LOC, overwrite=True)
+ self.filesystem_client.copy(
"%s/year=2010/month=1/100101.txt" % ALLTYPES_LOC,
- "%s/year=2010/month=1/_100101.txt" % TEST_TBL_LOC], shell=False)
- check_call(["hadoop", "fs", "-cp",
+ "%s/year=2010/month=1/_100101.txt" % TEST_TBL_LOC, overwrite=True)
+ self.filesystem_client.copy(
"%s/year=2010/month=1/100101.txt" % ALLTYPES_LOC,
- "%s/year=2010/month=1/100101.txt.COPYING" % TEST_TBL_LOC],
shell=False)
- check_call(["hadoop", "fs", "-cp",
+ "%s/year=2010/month=1/100101.txt.COPYING" % TEST_TBL_LOC,
overwrite=True)
+ self.filesystem_client.copy(
"%s/year=2010/month=1/100101.txt" % ALLTYPES_LOC,
- "%s/year=2010/month=1/100101.txt.TMP" % TEST_TBL_LOC], shell=False)
+ "%s/year=2010/month=1/100101.txt.TMP" % TEST_TBL_LOC, overwrite=True)
# Add hidden files to the empty partition. Use lower case hidden suffixes.
- check_call(["hadoop", "fs", "-cp",
+ self.filesystem_client.copy(
"%s/year=2010/month=2/100201.txt" % ALLTYPES_LOC,
- "%s/year=2010/month=2/.100201.txt" % TEST_TBL_LOC], shell=False)
- check_call(["hadoop", "fs", "-cp",
+ "%s/year=2010/month=2/.100201.txt" % TEST_TBL_LOC, overwrite=True)
+ self.filesystem_client.copy(
"%s/year=2010/month=2/100201.txt" % ALLTYPES_LOC,
- "%s/year=2010/month=2/_100201.txt" % TEST_TBL_LOC], shell=False)
- check_call(["hadoop", "fs", "-cp",
+ "%s/year=2010/month=2/_100201.txt" % TEST_TBL_LOC, overwrite=True)
+ self.filesystem_client.copy(
"%s/year=2010/month=2/100201.txt" % ALLTYPES_LOC,
- "%s/year=2010/month=2/100201.txt.copying" % TEST_TBL_LOC],
shell=False)
- check_call(["hadoop", "fs", "-cp",
+ "%s/year=2010/month=2/100201.txt.copying" % TEST_TBL_LOC,
overwrite=True)
+ self.filesystem_client.copy(
"%s/year=2010/month=2/100201.txt" % ALLTYPES_LOC,
- "%s/year=2010/month=2/100201.txt.tmp" % TEST_TBL_LOC], shell=False)
+ "%s/year=2010/month=2/100201.txt.tmp" % TEST_TBL_LOC, overwrite=True)
def test_hidden_files_load(self, vector, unique_database):
"""Tests that an incremental refresh ignores hidden files."""
diff --git a/tests/metadata/test_refresh_partition.py
b/tests/metadata/test_refresh_partition.py
index 67901da..58d142b 100644
--- a/tests/metadata/test_refresh_partition.py
+++ b/tests/metadata/test_refresh_partition.py
@@ -213,7 +213,7 @@ class TestRefreshPartition(ImpalaTestSuite):
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
dst_path = "%s/year=2010/month=1/%s" % (table_location, file_name)
- check_call(["hadoop", "fs", "-cp", "-f", src_file, dst_path], shell=False)
+ self.filesystem_client.copy(src_file, dst_path, overwrite=True)
# Check that data added is not visible before refresh
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
@@ -250,8 +250,7 @@ class TestRefreshPartition(ImpalaTestSuite):
assert result.data == [str(0)]
dst_path = table_location + "/year=2010/month=%s/" + file_name
for month in [1, 2]:
- check_call(["hadoop", "fs", "-cp", "-f", src_file, dst_path % month],
- shell=False)
+ self.filesystem_client.copy(src_file, dst_path % month, overwrite=True)
# Check that data added is not visible before refresh
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
diff --git a/tests/metadata/test_stale_metadata.py
b/tests/metadata/test_stale_metadata.py
index 9e5bf5f..223b454 100644
--- a/tests/metadata/test_stale_metadata.py
+++ b/tests/metadata/test_stale_metadata.py
@@ -136,4 +136,4 @@ class TestRewrittenFile(ImpalaTestSuite):
def __copy_file_to_test_table(self, src_path, table_location):
"""Copies the provided path to the test table, overwriting any previous
file."""
dst_path = "%s/%s" % (table_location, self.FILE_NAME)
- check_call(["hadoop", "fs", "-cp", "-f", src_path, dst_path], shell=False)
+ self.filesystem_client.copy(src_path, dst_path, overwrite=True)
diff --git a/tests/query_test/test_compressed_formats.py
b/tests/query_test/test_compressed_formats.py
index 2748bd4..81320d6 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -71,13 +71,15 @@ class TestCompressedFormats(ImpalaTestSuite):
def test_compressed_formats(self, vector):
file_format = vector.get_value('file_format')
extension, suffix = vector.get_value('compression_format')
- if file_format in ['rc', 'seq', 'text']:
+ if file_format in ['rc', 'seq']:
# TODO: How about LZO?
# Test that {gzip,snappy,bzip,deflate}-compressed
# {RC,sequence,text} files are supported.
db_suffix = '_%s_%s' % (file_format, suffix)
self._copy_and_query_compressed_file(
'tinytable', db_suffix, suffix, '000000_0', extension)
+ elif file_format is 'text':
+ pytest.xfail('IMPALA-9004: TestCompressedFormats is broken for text
files')
else:
assert False, "Unknown file_format: %s" % file_format
@@ -103,7 +105,7 @@ class TestCompressedFormats(ImpalaTestSuite):
# Create the table
self.run_stmt_in_hive(hive_cmd)
- call(["hadoop", "fs", "-cp", src_file, dest_file])
+ self.filesystem_client.copy(src_file, dest_file, overwrite=True)
# Try to read the compressed file with extension
query = 'select count(*) from %s' % dest_table
try:
@@ -112,6 +114,7 @@ class TestCompressedFormats(ImpalaTestSuite):
result = self.execute_scalar(query)
# Fail iff we expected an error
assert expected_error is None, 'Query is expected to fail'
+ assert result and int(result) > 0
except Exception as e:
error_msg = str(e)
print error_msg
@@ -207,7 +210,7 @@ class TestLargeCompressedFile(ImpalaTestSuite):
# Total uncompressed size of a nested structure.
total_chunk_size = num_blocks_per_chunk * payload_size
- hdfs_put = subprocess.Popen(["hadoop", "fs", "-put", "-f", "-", file_name],
+ hdfs_put = subprocess.Popen(["hdfs", "dfs", "-put", "-d", "-f", "-",
file_name],
stdin=subprocess.PIPE, bufsize=-1)
for i in range(num_chunks):
hdfs_put.stdin.write(struct.pack('>i', total_chunk_size))
diff --git a/tests/query_test/test_hdfs_file_mods.py
b/tests/query_test/test_hdfs_file_mods.py
index 30698e1..48e8995 100644
--- a/tests/query_test/test_hdfs_file_mods.py
+++ b/tests/query_test/test_hdfs_file_mods.py
@@ -56,7 +56,8 @@ class TestHdfsFileMods(ImpalaTestSuite):
# Use HDFS commands to clone the table's files at the hdfs level
old_table_location =
"{0}/test-warehouse/tinytable".format(FILESYSTEM_PREFIX)
call(["hdfs", "dfs", "-mkdir", new_table_location])
- call(["hdfs", "dfs", "-cp", old_table_location + "/*", new_table_location])
+ self.filesystem_client.copy(old_table_location + "/*", new_table_location,
+ overwrite=True)
# Create an external table with the new files (tinytable has two string
columns)
create_table = "create external table {0}.t1 (a string, b string) "\
diff --git a/tests/query_test/test_insert_parquet.py
b/tests/query_test/test_insert_parquet.py
index f224c73..ac647ed 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -333,8 +333,8 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
file has the same column type metadata as the generated one."""
hdfs_path = (os.environ['DEFAULT_FS'] + "/test-warehouse/{0}.db/"
"signed_integer_logical_types.parquet").format(unique_database)
- check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
- '/testdata/data/signed_integer_logical_types.parquet',
hdfs_path])
+ self.filesystem_client.copy_from_local(os.environ['IMPALA_HOME'] +
+ '/testdata/data/signed_integer_logical_types.parquet', hdfs_path)
# Create table with signed integer logical types
src_tbl = "{0}.{1}".format(unique_database, "read_write_logical_type_src")
create_tbl_stmt = """create table {0} like parquet "{1}"
diff --git a/tests/query_test/test_multiple_filesystems.py
b/tests/query_test/test_multiple_filesystems.py
index 2e97821..91abe53 100644
--- a/tests/query_test/test_multiple_filesystems.py
+++ b/tests/query_test/test_multiple_filesystems.py
@@ -49,10 +49,10 @@ class TestMultipleFilesystems(ImpalaTestSuite):
call(["hadoop", "fs", "-mkdir",
get_secondary_fs_path("/multi_fs_tests/")], shell=False)
check_call(["hadoop", "fs", "-mkdir",
get_secondary_fs_path("/multi_fs_tests/%s.db/" % db_name)],
shell=False)
- check_call(["hadoop", "fs", "-cp", "/test-warehouse/alltypes_parquet/",
- get_secondary_fs_path("/multi_fs_tests/%s.db/" % db_name)],
shell=False)
- check_call(["hadoop", "fs", "-cp", "/test-warehouse/tinytable/",
- get_secondary_fs_path("/multi_fs_tests/%s.db/" % db_name)],
shell=False)
+ self.filesystem_client.copy("/test-warehouse/alltypes_parquet/",
+ get_secondary_fs_path("/multi_fs_tests/%s.db/" % db_name),
overwrite=True)
+ self.filesystem_client.copy("/test-warehouse/tinytable/",
get_secondary_fs_path(
+ "/multi_fs_tests/%s.db/" % db_name), overwrite=True)
@pytest.mark.execute_serially
def test_multiple_filesystems(self, vector, unique_database):
diff --git a/tests/query_test/test_nested_types.py
b/tests/query_test/test_nested_types.py
index 779af4a..d1e2719 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -552,7 +552,6 @@ class TestParquetArrayEncodings(ImpalaTestSuite):
# ...f22 = 220
# ..F11 = 110
# ..F12 = 120
- @SkipIfS3.eventually_consistent
def test_ambiguous_list(self, vector, unique_database):
"""IMPALA-4725: Tests the schema-resolution behavior with different values
for the
PARQUET_ARRAY_RESOLUTION and PARQUET_FALLBACK_SCHEMA_RESOLUTION query
options.
@@ -589,7 +588,7 @@ class TestParquetArrayEncodings(ImpalaTestSuite):
self.client.execute("create table %s.%s (%s) stored as parquet location
'%s'" %
(dbname, tablename, columns, location))
local_path = self.TESTFILE_DIR + "/" + filename
- check_call(["hadoop", "fs", "-put", local_path, location], shell=False)
+ self.filesystem_client.copy_from_local(local_path, location)
class TestMaxNestingDepth(ImpalaTestSuite):
# Should be kept in sync with the FE's Type.MAX_NESTING_DEPTH
@@ -623,9 +622,9 @@ class TestMaxNestingDepth(ImpalaTestSuite):
def __create_parquet_tables(self, unique_database, as_target=True):
"""Create Parquet tables from files. If 'as_target' is False, the Parquet
tables will
be used to create ORC tables, so we add a suffix in the table names."""
- check_call(["hdfs", "dfs", "-copyFromLocal",
+ self.filesystem_client.copy_from_local(
"%s/testdata/max_nesting_depth" % os.environ['IMPALA_HOME'],
- "%s/%s.db/" % (WAREHOUSE, unique_database)], shell=False)
+ "%s/%s.db/" % (WAREHOUSE, unique_database))
tbl_suffix = '' if as_target else self.TEMP_TABLE_SUFFIX
for tbl in self.TABLES:
tbl_name = "%s.%s_tbl%s" % (unique_database, tbl, tbl_suffix)
diff --git a/tests/query_test/test_scanners.py
b/tests/query_test/test_scanners.py
index 198ae4c..5b17ecb 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -714,7 +714,6 @@ class TestParquet(ImpalaTestSuite):
assert c_schema_elt.converted_type == ConvertedType.UTF8
assert d_schema_elt.converted_type == None
- @SkipIfS3.eventually_consistent
def test_resolution_by_name(self, vector, unique_database):
self.run_test_case('QueryTest/parquet-resolution-by-name', vector,
use_db=unique_database)
@@ -1117,7 +1116,7 @@ class TestTextSplitDelimiters(ImpalaTestSuite):
with tempfile.NamedTemporaryFile() as f:
f.write(data)
f.flush()
- check_call(['hadoop', 'fs', '-copyFromLocal', f.name, location])
+ self.filesystem_client.copy_from_local(f.name, location)
self.client.execute("refresh %s" % qualified_table_name);
vector.get_value('exec_option')['max_scan_range_length'] =
max_scan_range_length
@@ -1270,7 +1269,7 @@ class TestOrc(ImpalaTestSuite):
tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (db, tbl))
# set block size to 156672 so lineitem_threeblocks.orc occupies 3 blocks,
# lineitem_sixblocks.orc occupies 6 blocks.
- check_call(['hdfs', 'dfs', '-Ddfs.block.size=156672', '-copyFromLocal',
+ check_call(['hdfs', 'dfs', '-Ddfs.block.size=156672', '-copyFromLocal',
'-d', '-f',
os.environ['IMPALA_HOME'] + "/testdata/LineItemMultiBlock/" + file,
tbl_loc])
def _misaligned_orc_stripes_helper(
diff --git a/tests/query_test/test_scanners_fuzz.py
b/tests/query_test/test_scanners_fuzz.py
index a7a578c..1a505ca 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -176,7 +176,7 @@ class TestScannersFuzzing(ImpalaTestSuite):
# Copy all of the local files and directories to hdfs.
to_copy = ["%s/%s" % (tmp_table_dir, file_or_dir)
for file_or_dir in os.listdir(tmp_table_dir)]
- check_call(['hdfs', 'dfs', '-copyFromLocal', '-d'] + to_copy +
[fuzz_table_location])
+ self.filesystem_client.copy_from_local(to_copy, fuzz_table_location)
if "SCANNER_FUZZ_KEEP_FILES" not in os.environ:
shutil.rmtree(tmp_table_dir)
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 3839044..8572db8 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -338,7 +338,7 @@ class TestUdfExecution(TestUdfBase):
try:
with open(dir_name, "w") as f:
f.write("Hello World")
- check_call(["hadoop", "fs", "-put", "-f", f.name, hdfs_path])
+ self.filesystem_client.copy_from_local(f.name, hdfs_path)
if vector.get_value('exec_option')['disable_codegen']:
self.run_test_case('QueryTest/udf-errors', vector,
use_db=unique_database)
finally:
@@ -436,7 +436,7 @@ class TestUdfTargeted(TestUdfBase):
unique_database, tgt_udf_path))
query = "select `{0}`.fn_invalid_symbol('test')".format(unique_database)
- check_call(["hadoop", "fs", "-put", "-f", src_udf_path, tgt_udf_path])
+ self.filesystem_client.copy_from_local(src_udf_path, tgt_udf_path)
self.client.execute(drop_fn_stmt)
self.client.execute(create_fn_stmt)
for _ in xrange(5):
@@ -471,7 +471,7 @@ class TestUdfTargeted(TestUdfBase):
jar_path = get_fs_path("/test-warehouse/{0}.db/".format(unique_database)
+ get_random_id(5) + ".jar")
hive_jar = get_fs_path("/test-warehouse/hive-exec.jar")
- check_call(["hadoop", "fs", "-cp", hive_jar, jar_path])
+ self.filesystem_client.copy(hive_jar, jar_path)
drop_fn_stmt = (
"drop function if exists "
"`{0}`.`pi_missing_jar`()".format(unique_database))
@@ -528,14 +528,14 @@ class TestUdfTargeted(TestUdfBase):
query_stmt = "select
`{0}`.`udf_update_test_drop`()".format(unique_database)
# Put the old UDF binary on HDFS, make the UDF in Impala and run it.
- check_call(["hadoop", "fs", "-put", "-f", old_udf, udf_dst])
+ self.filesystem_client.copy_from_local(old_udf, udf_dst)
self.execute_query_expect_success(self.client, drop_fn_stmt, exec_options)
self.execute_query_expect_success(self.client, create_fn_stmt,
exec_options)
self._run_query_all_impalads(exec_options, query_stmt, ["Old UDF"])
# Update the binary, drop and create the function again. The new binary
should
# be running.
- check_call(["hadoop", "fs", "-put", "-f", new_udf, udf_dst])
+ self.filesystem_client.copy_from_local(new_udf, udf_dst)
self.execute_query_expect_success(self.client, drop_fn_stmt, exec_options)
self.execute_query_expect_success(self.client, create_fn_stmt,
exec_options)
self._run_query_all_impalads(exec_options, query_stmt, ["New UDF"])
@@ -568,7 +568,7 @@ class TestUdfTargeted(TestUdfBase):
query_template = "select `{0}`.`{{0}}`()".format(unique_database)
# Put the old UDF binary on HDFS, make the UDF in Impala and run it.
- check_call(["hadoop", "fs", "-put", "-f", old_udf, udf_dst])
+ self.filesystem_client.copy_from_local(old_udf, udf_dst)
self.execute_query_expect_success(
self.client, create_fn_template.format(old_function_name),
exec_options)
self._run_query_all_impalads(
@@ -576,7 +576,7 @@ class TestUdfTargeted(TestUdfBase):
# Update the binary, and create a new function using the binary. The new
binary
# should be running.
- check_call(["hadoop", "fs", "-put", "-f", new_udf, udf_dst])
+ self.filesystem_client.copy_from_local(new_udf, udf_dst)
self.execute_query_expect_success(
self.client, create_fn_template.format(new_function_name),
exec_options)
self._run_query_all_impalads(
diff --git a/tests/util/adls_util.py b/tests/util/adls_util.py
index b72b4c1..3b9ea0f 100644
--- a/tests/util/adls_util.py
+++ b/tests/util/adls_util.py
@@ -23,6 +23,7 @@
from azure.datalake.store import core, lib, multithread, exceptions
from tests.util.filesystem_base import BaseFilesystem
from tests.util.filesystem_utils import ADLS_CLIENT_ID, ADLS_TENANT_ID,
ADLS_CLIENT_SECRET
+from tests.util.hdfs_util import HadoopFsCommandLineClient
class ADLSClient(BaseFilesystem):
@@ -31,6 +32,7 @@ class ADLSClient(BaseFilesystem):
client_secret = ADLS_CLIENT_SECRET,
client_id = ADLS_CLIENT_ID)
self.adlsclient = core.AzureDLFileSystem(self.token, store_name=store)
+ self.adls_cli_client = HadoopFsCommandLineClient("ADLS")
def create_file(self, path, file_data, overwrite=True):
if not overwrite and self.exists(path): return False
@@ -43,13 +45,11 @@ class ADLSClient(BaseFilesystem):
self.adlsclient.mkdir(path)
return True
- def copy(self, src, dst):
- # The ADLS Python client doesn't support cp() yet, so we have to download
and
- # reupload to the destination.
- src_contents = self.adlsclient.cat(src)
- self.create_file(dst, src_contents, overwrite=True)
- assert self.exists(dst), \
- 'ADLS copy failed: Destination file {dst} does not
exist'.format(dst=dst)
+ def copy(self, src, dst, overwrite=True):
+ self.adls_cli_client.copy(src, dst, overwrite)
+
+ def copy_from_local(self, src, dst):
+ self.adls_cli_client.copy_from_local(src, dst)
def ls(self, path):
file_paths = self.adlsclient.ls(path)
diff --git a/tests/util/filesystem_base.py b/tests/util/filesystem_base.py
index 9396a2e..f744530 100644
--- a/tests/util/filesystem_base.py
+++ b/tests/util/filesystem_base.py
@@ -36,11 +36,18 @@ class BaseFilesystem(object):
pass
@abstractmethod
- def copy(self, src, dst):
+ def copy(self, src, dst, overwrite):
"""Copy a file from 'src' to 'dst'. Throws an exception if unsuccessful."""
pass
@abstractmethod
+ def copy_from_local(self, src, dst):
+ """Copies a file from 'src' file on the local filesystem to the 'dst',
which can be
+ on any HDFS compatible filesystem. Fails if the src file is not on the
local
+ filesystem. Throws an exception if unsuccessful."""
+ pass
+
+ @abstractmethod
def ls(self, path):
"""Return a list of all files/dirs/keys in path. Throws an exception if
path
is invalid."""
diff --git a/tests/util/hdfs_util.py b/tests/util/hdfs_util.py
index f015f1f..b22dec4 100644
--- a/tests/util/hdfs_util.py
+++ b/tests/util/hdfs_util.py
@@ -55,7 +55,61 @@ CORE_CONF =
HdfsConfig(os.path.join(environ['HADOOP_CONF_DIR'], "core-site.xml")
NAMENODE = FILESYSTEM_PREFIX or CORE_CONF.get('fs.defaultFS')
-class PyWebHdfsClientWithChmod(PyWebHdfsClient, BaseFilesystem):
+class DelegatingHdfsClient(BaseFilesystem):
+ """HDFS client that either delegates to a PyWebHdfsClientWithChmod or a
+ HadoopFsCommandLineClient. Since PyWebHdfsClientWithChmod does not have good
support
+ for copying files, this class delegates to HadoopFsCommandLineClient for all
copy
+ operations."""
+
+ def __init__(self, webhdfs_client, hdfs_filesystem_client):
+ self.webhdfs_client = webhdfs_client
+ self.hdfs_filesystem_client = hdfs_filesystem_client
+ super(DelegatingHdfsClient, self).__init__()
+
+ def create_file(self, path, file_data, overwrite=True):
+ return self.webhdfs_client.create_file(path, file_data,
overwrite=overwrite)
+
+ def make_dir(self, path, permission=None):
+ if permission:
+ return self.webhdfs_client.make_dir(path, permission=permission)
+ else:
+ return self.webhdfs_client.make_dir(path)
+
+ def copy(self, src, dst, overwrite=False):
+ self.hdfs_filesystem_client.copy(src, dst, overwrite)
+
+ def copy_from_local(self, src, dst):
+ self.hdfs_filesystem_client.copy_from_local(src, dst)
+
+ def ls(self, path):
+ return self.webhdfs_client.ls(path)
+
+ def exists(self, path):
+ return self.webhdfs_client.exists(path)
+
+ def delete_file_dir(self, path, recursive=False):
+ return self.webhdfs_client.delete_file_dir(path, recursive=recursive)
+
+ def get_file_dir_status(self, path):
+ return self.webhdfs_client.get_file_dir_status(path)
+
+ def get_all_file_sizes(self, path):
+ return self.webhdfs_client.get_all_file_sizes(path)
+
+ def chmod(self, path, permission):
+ return self.webhdfs_client.chmod(path, permission)
+
+ def chown(self, path, user, group):
+ return self.webhdfs_client.chown(path, user, group)
+
+ def setacl(self, path, acls):
+ return self.webhdfs_client.setacl(path, acls)
+
+ def getacl(self, path):
+ return self.webhdfs_client.getacl(path)
+
+
+class PyWebHdfsClientWithChmod(PyWebHdfsClient):
def chmod(self, path, permission):
"""Set the permission of 'path' to 'permission' (specified as an octal
string, e.g.
'775'"""
@@ -115,22 +169,6 @@ class PyWebHdfsClientWithChmod(PyWebHdfsClient,
BaseFilesystem):
sizes += [status['length']]
return sizes
- def copy(self, src, dest):
- """Copies a file in hdfs from src to destination
-
- Simulates hdfs dfs (or hadoop fs) -cp <src> <dst>. Does not resolve all
the files if
- the source or destination is a directory. Files need to be explicitly
copied.
- TODO: Infer whether the source or destination is a directory and do this
implicitly.
- TODO: Take care of larger files by always reading/writing them in small
chunks.
- """
- assert self.get_file_dir_status(src)
- # Get the data
- data = self.read_file(src)
- # Copy the data
- self.create_file(dest, data)
- assert self.get_file_dir_status(dest)
- assert self.read_file(dest) == data
-
def ls(self, path):
"""Returns a list of all file and directory names in 'path'"""
# list_dir() returns a dictionary of file statues. This function picks out
the
@@ -161,8 +199,8 @@ class HadoopFsCommandLineClient(BaseFilesystem):
super(HadoopFsCommandLineClient, self).__init__()
def _hadoop_fs_shell(self, command):
- """Helper function wrapper around 'hadoop fs' takes in the arguments as a
list."""
- hadoop_command = ['hadoop', 'fs'] + command
+ """Helper function wrapper around 'hdfs dfs' takes in the arguments as a
list."""
+ hadoop_command = ['hdfs', 'dfs'] + command
process = subprocess.Popen(hadoop_command,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
@@ -176,8 +214,10 @@ class HadoopFsCommandLineClient(BaseFilesystem):
if not overwrite and self.exists(fixed_path): return False
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_file.write(file_data)
- (status, stdout, stderr) = \
- self._hadoop_fs_shell(['-put', tmp_file.name, fixed_path])
+ put_cmd_params = ['-put', '-d']
+ if overwrite: put_cmd_params.append('-f')
+ put_cmd_params.extend([tmp_file.name, fixed_path])
+ (status, stdout, stderr) = self._hadoop_fs_shell(put_cmd_params)
return status == 0
def make_dir(self, path, permission=None):
@@ -186,18 +226,35 @@ class HadoopFsCommandLineClient(BaseFilesystem):
(status, stdout, stderr) = self._hadoop_fs_shell(['-mkdir', '-p',
fixed_path])
return status == 0
- def copy(self, src, dst):
- """Copy the source file to the destination."""
+ def copy(self, src, dst, overwrite):
+ """Copy the source file to the destination. Specifes the '-d' option by
default, which
+ 'Skip[s] creation of temporary file with the suffix ._COPYING_.' to avoid
extraneous
+ copies on S3. If overwrite is true, the destination file is overwritten,
set to false
+ by default for backwards compatibility."""
fixed_src = self._normalize_path(src)
fixed_dst = self._normalize_path(dst)
- (status, stdout, stderr) = \
- self._hadoop_fs_shell(['-cp', fixed_src, fixed_dst])
+ cp_cmd_params = ['-cp', '-d']
+ if overwrite: cp_cmd_params.append('-f')
+ cp_cmd_params.extend([fixed_src, fixed_dst])
+ (status, stdout, stderr) = self._hadoop_fs_shell(cp_cmd_params)
assert status == 0, \
'{0} copy failed: '.format(self.filesystem_type) + stderr + "; " +
stdout
assert self.exists(dst), \
'{fs_type} copy failed: Destination file {dst} does not exist'\
.format(fs_type=self.filesystem_type, dst=dst)
+ def copy_from_local(self, src, dst):
+ """Wrapper around 'hdfs dfs -copyFromLocal [-f] [-p] [-l] [-d] <localsrc>
... <dst>'.
+ Overwrites files by default to avoid S3 consistency issues. Specifes the
'-d' option
+ by default, which 'Skip[s] creation of temporary file with the suffix
._COPYING_.' to
+ avoid extraneous copies on S3. 'src' must be either a string or a list of
strings."""
+ assert isinstance(src, list) or isinstance(src, basestring)
+ src_list = src if isinstance(src, list) else [src]
+ (status, stdout, stderr) = self._hadoop_fs_shell(['-copyFromLocal', '-d',
'-f'] +
+ src_list + [dst])
+ assert status == 0, '{0} copy from {1} to {2} failed:
'.format(self.filesystem_type,
+ src, dst) + stderr + '; ' + stdout
+
def _inner_ls(self, path):
"""List names, lengths, and mode for files/directories under the specified
path."""
fixed_path = self._normalize_path(path)
@@ -253,16 +310,16 @@ class HadoopFsCommandLineClient(BaseFilesystem):
return path if path.startswith('/') else '/' + path
-def get_hdfs_client_from_conf(conf):
+def get_webhdfs_client_from_conf(conf):
"""Returns a new HTTP client for an HDFS cluster using an HdfsConfig
object"""
hostport = conf.get('dfs.namenode.http-address')
if hostport is None:
raise Exception("dfs.namenode.http-address not found in config")
host, port = hostport.split(":")
- return get_hdfs_client(host=host, port=port)
+ return get_webhdfs_client(host=host, port=port)
-def get_hdfs_client(host, port, user_name=getpass.getuser()):
+def get_webhdfs_client(host, port, user_name=getpass.getuser()):
"""Returns a new HTTP client for an HDFS cluster using an explict host:port
pair"""
return PyWebHdfsClientWithChmod(host=host, port=port, user_name=user_name)
@@ -271,7 +328,3 @@ def get_default_hdfs_config():
core_site_path = os.path.join(environ.get('HADOOP_CONF_DIR'),
'core-site.xml')
hdfs_site_path = os.path.join(environ.get('HADOOP_CONF_DIR'),
'hdfs-site.xml')
return HdfsConfig(core_site_path, hdfs_site_path)
-
-
-def create_default_hdfs_client():
- return get_hdfs_client_from_conf(get_default_hdfs_config())