IMPALA-5383: Fix PARQUET_FILE_SIZE option for ADLS PARQUET_FILE_SIZE query option doesn't work with ADLS because the AdlFileSystem doesn't have a notion of block sizes. And impala depends on the filesystem remembering the block size which is then used as the target parquet file size (this is done for Hdfs so that the parquet file size and block size match even if the parquet_file_size isn't a valid blocksize).
We special case for Adls just like we do for S3 to bypass the FileSystem block size, and instead just use the requested PARQUET_FILE_SIZE as the output partitions block_size (and consequently the parquet file target size). Testing: Re-enabled test_insert_parquet_verify_size() for ADLS. Also fixed a miscellaneous bug with the ADLS client listing helper function. Change-Id: I474a913b0ff9b2709f397702b58cb1c74251c25b Reviewed-on: http://gerrit.cloudera.org:8080/7018 Reviewed-by: Sailesh Mukil <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1f34a9e7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1f34a9e7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1f34a9e7 Branch: refs/heads/master Commit: 1f34a9e7034cb1b068dbcaba94d3f01295995fee Parents: 9caf214 Author: Sailesh Mukil <[email protected]> Authored: Tue May 30 18:56:43 2017 +0000 Committer: Impala Public Jenkins <[email protected]> Committed: Wed May 31 07:41:24 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-table-sink.cc | 8 +++++--- be/src/util/hdfs-util.cc | 7 +++++++ be/src/util/hdfs-util.h | 3 +++ tests/query_test/test_insert_parquet.py | 4 ---- tests/util/adls_util.py | 3 ++- 5 files changed, 17 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1f34a9e7/be/src/exec/hdfs-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 9da6e57..b49451a 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -390,10 +390,12 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, output_partition->current_file_name)); } - if (IsS3APath(output_partition->current_file_name.c_str())) { + if (IsS3APath(output_partition->current_file_name.c_str()) || + IsADLSPath(output_partition->current_file_name.c_str())) { // On S3A, the file cannot be stat'ed until after it's closed, and even so, the block - // size reported will be just the filesystem default. So, remember the requested - // block size. + // size reported will be just the filesystem default. Similarly, the block size + // reported for ADLS will be the filesystem default. So, remember the requested block + // size. output_partition->block_size = block_size; } else { // HDFS may choose to override the block size that we've recommended, so for non-S3 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1f34a9e7/be/src/util/hdfs-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/hdfs-util.cc b/be/src/util/hdfs-util.cc index 440b68d..28d318c 100644 --- a/be/src/util/hdfs-util.cc +++ b/be/src/util/hdfs-util.cc @@ -85,6 +85,13 @@ bool IsS3APath(const char* path) { return strncmp(path, "s3a://", 6) == 0; } +bool IsADLSPath(const char* path) { + if (strstr(path, ":/") == NULL) { + return ExecEnv::GetInstance()->default_fs().compare(0, 6, "adl://") == 0; + } + return strncmp(path, "adl://", 6) == 0; +} + // Returns the length of the filesystem name in 'path' which is the length of the // 'scheme://authority'. Returns 0 if the path is unqualified. static int GetFilesystemNameLength(const char* path) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1f34a9e7/be/src/util/hdfs-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/hdfs-util.h b/be/src/util/hdfs-util.h index 32be643..b9f415b 100644 --- a/be/src/util/hdfs-util.h +++ b/be/src/util/hdfs-util.h @@ -50,6 +50,9 @@ bool IsHdfsPath(const char* path); /// Returns true iff the path refers to a location on an S3A filesystem. bool IsS3APath(const char* path); +/// Returns true iff the path refers to a location on an ADL filesystem. +bool IsADLSPath(const char* path); + /// Returns true iff 'pathA' and 'pathB' are on the same filesystem. bool FilesystemsMatch(const char* pathA, const char* pathB); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1f34a9e7/tests/query_test/test_insert_parquet.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py index ee24549..c19363f 100644 --- a/tests/query_test/test_insert_parquet.py +++ b/tests/query_test/test_insert_parquet.py @@ -161,10 +161,6 @@ class TestInsertParquetVerifySize(ImpalaTestSuite): cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension("compression_codec", *PARQUET_CODECS)) - # ADLS does not have a configurable block size, so the 'PARQUET_FILE_SIZE' option - # that's passed as a hint to Hadoop is ignored for AdlFileSystem. So, we skip this - # test for ADLS. - @SkipIfADLS.hdfs_block_size @SkipIfIsilon.hdfs_block_size @SkipIfLocal.hdfs_client def test_insert_parquet_verify_size(self, vector, unique_database): http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1f34a9e7/tests/util/adls_util.py ---------------------------------------------------------------------- diff --git a/tests/util/adls_util.py b/tests/util/adls_util.py index f616074..b72b4c1 100644 --- a/tests/util/adls_util.py +++ b/tests/util/adls_util.py @@ -73,4 +73,5 @@ class ADLSClient(BaseFilesystem): def get_all_file_sizes(self, path): """Returns a list of integers which are all the file sizes of files found under 'path'.""" - return [self.adlsclient.info(f)['length'] for f in self.ls(path)] + return [self.adlsclient.info(f)['length'] for f in self.adlsclient.ls(path) \ + if self.adlsclient.info(f)['type'] == 'FILE']
