IMPALA-3680: Cleanup the scan range state after failed hdfs cache reads Currently we don't reset the file read offset if ZCR fails. Due to this, when we switch to the normal read path, we hit the eosr of the scan-range even before reading the expected data length. If both the ReadFromCache() and ReadRange() calls fail without reading any data, we end up creating a whole list of scan-ranges, each with size 1KB (DEFAULT_READ_PAST_SIZE) assuming we are reading past the scan range. This gives a huge performance hit. This patch just calls ScanRange::Close() after the failed cache reads to clean up the file system state so that the re-reads start from beginning of the scan range.
This was hit as a part of debugging IMPALA-3679, where the queries on 1gb cached data were running ~20x slower compared to non-cached runs. Change-Id: I0a9ea19dd8571b01d2cd5b87da1c259219f6297a Reviewed-on: http://gerrit.cloudera.org:8080/3313 Reviewed-by: Michael Brown <[email protected]> Tested-by: Bharath Vissapragada <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d1975166 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d1975166 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d1975166 Branch: refs/heads/master Commit: d19751669acb5b5886792d8606d5dca7ee5b30a8 Parents: 08e8de7 Author: Bharath Vissapragada <[email protected]> Authored: Mon Jun 6 03:46:11 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Tue Jul 5 13:37:26 2016 -0700 ---------------------------------------------------------------------- be/src/runtime/disk-io-mgr-scan-range.cc | 11 ++- .../common/etc/hadoop/conf/hdfs-site.xml.tmpl | 2 +- tests/query_test/test_hdfs_caching.py | 75 +++++++++++++++++++- 3 files changed, 83 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d1975166/be/src/runtime/disk-io-mgr-scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc index 25399bc..9a7e39a 100644 --- a/be/src/runtime/disk-io-mgr-scan-range.cc +++ b/be/src/runtime/disk-io-mgr-scan-range.cc @@ -423,9 +423,14 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) { DCHECK(cached_buffer_ == NULL); cached_buffer_ = hadoopReadZero(hdfs_file_->file(), io_mgr_->cached_read_options_, len()); - - // Data was not cached, caller will fall back to normal read path. - if (cached_buffer_ == NULL) return Status::OK(); + } + // Data was not cached, caller will fall back to normal read path. + if (cached_buffer_ == NULL) { + VLOG_QUERY << "Cache read failed for scan range: " << DebugString() + << ". Switching to disk read path."; + // Clean up the scan range state before re-issuing it. + Close(); + return Status::OK(); } // Cached read succeeded. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d1975166/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl ---------------------------------------------------------------------- diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl b/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl index 740bebd..575fc0a 100644 --- a/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl +++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl @@ -82,7 +82,7 @@ <value>134217728</value> </property> - <!-- Set the max cached memory to ~64kb. This must be less than ulimit -1 --> + <!-- Set the max cached memory to ~64kb. This must be less than ulimit -l --> <property> <name>dfs.datanode.max.locked.memory</name> <value>64000</value> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d1975166/tests/query_test/test_hdfs_caching.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_hdfs_caching.py b/tests/query_test/test_hdfs_caching.py index 10a9e2a..e275cbd 100644 --- a/tests/query_test/test_hdfs_caching.py +++ b/tests/query_test/test_hdfs_caching.py @@ -5,13 +5,15 @@ import logging import os import pytest from copy import copy -from subprocess import call +from subprocess import check_call, call +from time import time from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.impala_test_suite import * from tests.common.test_vector import * from tests.common.impala_cluster import ImpalaCluster from tests.common.test_dimensions import create_exec_option_dimension from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.util.filesystem_utils import get_fs_path from tests.util.shell_util import exec_process # End to end test that hdfs caching is working. @@ -87,6 +89,67 @@ class TestHdfsCaching(ImpalaTestSuite): result = self.execute_query(query_string) assert(len(result.data) == 2) +# A separate class has been created for "test_hdfs_caching_fallback_path" to make it +# run as a part of exhaustive tests which require the workload to be 'functional-query'. +# TODO: Move this to TestHdfsCaching once we make exhaustive tests run for other workloads [email protected] [email protected] [email protected] +class TestHdfsCachingFallbackPath(ImpalaTestSuite): + @classmethod + def get_workload(self): + return 'functional-query' + + @SkipIfS3.hdfs_encryption + @SkipIfIsilon.hdfs_encryption + @SkipIfLocal.hdfs_encryption + def test_hdfs_caching_fallback_path(self, vector, unique_database, testid_checksum): + """ This tests the code path of the query execution where the hdfs cache read fails + and the execution falls back to the normal read path. To reproduce this situation we + rely on IMPALA-3679, where zcrs are not supported with encryption zones. This makes + sure ReadFromCache() fails and falls back to ReadRange() to read the scan range.""" + + if self.exploration_strategy() != 'exhaustive' or\ + vector.get_value('table_format').file_format != 'text': + pytest.skip() + + # Create a new encryption zone and copy the tpch.nation table data into it. + encrypted_table_dir = get_fs_path("/test-warehouse/" + testid_checksum) + create_query_sql = "CREATE EXTERNAL TABLE %s.cached_nation like tpch.nation "\ + "LOCATION '%s'" % (unique_database, encrypted_table_dir) + check_call(["hdfs", "dfs", "-mkdir", encrypted_table_dir], shell=False) + check_call(["hdfs", "crypto", "-createZone", "-keyName", "testKey1", "-path",\ + encrypted_table_dir], shell=False) + check_call(["hdfs", "dfs", "-cp", get_fs_path("/test-warehouse/tpch.nation/*.tbl"),\ + encrypted_table_dir], shell=False) + # Reduce the scan range size to force the query to have multiple scan ranges. + exec_options = vector.get_value('exec_option') + exec_options['max_scan_range_length'] = 1024 + try: + self.execute_query_expect_success(self.client, create_query_sql) + # Cache the table data + self.execute_query_expect_success(self.client, "ALTER TABLE %s.cached_nation set " + "cached in 'testPool'" % unique_database) + # Wait till the whole path is cached. We set a deadline of 20 seconds for the path + # to be cached to make sure this doesn't loop forever in case of caching errors. + caching_deadline = time.time() + 20 + while not is_path_fully_cached(encrypted_table_dir): + if time.time() > caching_deadline: + pytest.fail("Timed out caching path: " + encrypted_table_dir) + time.sleep(2) + self.execute_query_expect_success(self.client, "invalidate metadata " + "%s.cached_nation" % unique_database); + result = self.execute_query_expect_success(self.client, "select count(*) from " + "%s.cached_nation" % unique_database, exec_options) + assert(len(result.data) == 1) + assert(result.data[0] == '25') + except Exception as e: + pytest.fail("Failure in test_hdfs_caching_fallback_path: " + str(e)) + finally: + check_call(["hdfs", "dfs", "-rm", "-r", "-f", "-skipTrash", encrypted_table_dir],\ + shell=False) + + @SkipIfS3.caching @SkipIfIsilon.caching @SkipIfLocal.caching @@ -180,6 +243,16 @@ def drop_cache_directives_for_path(path): assert rc == 0, \ "Error removing cache directive for path %s (%s, %s)" % (path, stdout, stderr) +def is_path_fully_cached(path): + """Returns true if all the bytes of the path are cached, false otherwise""" + rc, stdout, stderr = exec_process("hdfs cacheadmin -listDirectives -stats -path %s" % path) + assert rc == 0 + caching_stats = stdout.strip("\n").split("\n")[-1].split() + # Compare BYTES_NEEDED and BYTES_CACHED, the output format is as follows + # "ID POOL REPL EXPIRY PATH BYTES_NEEDED BYTES_CACHED FILES_NEEDED FILES_CACHED" + return len(caching_stats) > 0 and caching_stats[5] == caching_stats[6] + + def get_cache_directive_for_path(path): rc, stdout, stderr = exec_process("hdfs cacheadmin -listDirectives -path %s" % path) assert rc == 0
