Repository: incubator-impala Updated Branches: refs/heads/master e89d7057a -> f15589573
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/be/src/runtime/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h index b70c9c8..e2a92ec 100644 --- a/be/src/runtime/disk-io-mgr.h +++ b/be/src/runtime/disk-io-mgr.h @@ -31,11 +31,12 @@ #include "common/hdfs.h" #include "common/object-pool.h" #include "common/status.h" +#include "runtime/disk-io-mgr-handle-cache.h" #include "runtime/thread-resource-mgr.h" +#include "util/aligned-new.h" #include "util/bit-util.h" #include "util/error-util.h" #include "util/internal-queue.h" -#include "util/lru-cache.h" #include "util/runtime-profile.h" #include "util/thread.h" @@ -196,39 +197,12 @@ class MemTracker; class DiskIoRequestContext; -class DiskIoMgr { +// This is cache line aligned because the FileHandleCache needs cache line alignment +// for its partitions. +class DiskIoMgr : public CacheLineAligned { public: class ScanRange; - /// This class is a small wrapper around the hdfsFile handle and the file system - /// instance which is needed to close the file handle in case of eviction. It - /// additionally encapsulates the last modified time of the associated file when it was - /// last opened. - class HdfsCachedFileHandle { - public: - - /// Constructor will open the file - HdfsCachedFileHandle(const hdfsFS& fs, const char* fname, int64_t mtime); - - /// Destructor will close the file handle - ~HdfsCachedFileHandle(); - - hdfsFile file() const { return hdfs_file_; } - - int64_t mtime() const { return mtime_; } - - /// This method is called to release acquired resources by the cached handle when it - /// is evicted. - static void Release(HdfsCachedFileHandle** h); - - bool ok() const { return hdfs_file_ != NULL; } - - private: - hdfsFS fs_; - hdfsFile hdfs_file_; - int64_t mtime_; - }; - /// Buffer struct that is used by the caller and IoMgr to pass read buffers. /// It is is expected that only one thread has ownership of this object at a /// time. @@ -306,7 +280,7 @@ class DiskIoMgr { /// true if the current scan range is complete bool eosr_; - /// Status of the read to this buffer. if status is not ok, 'buffer' is NULL + /// Status of the read to this buffer. if status is not ok, 'buffer' is nullptr Status status_; int64_t scan_range_offset_; @@ -327,6 +301,7 @@ class DiskIoMgr { public: hdfsFS fs() const { return fs_; } const char* file() const { return file_.c_str(); } + std::string* file_string() { return &file_; } int64_t offset() const { return offset_; } int64_t len() const { return len_; } int disk_id() const { return disk_id_; } @@ -334,9 +309,9 @@ class DiskIoMgr { protected: RequestRange(RequestType::type request_type) - : fs_(NULL), offset_(-1), len_(-1), disk_id_(-1), request_type_(request_type) {} + : fs_(nullptr), offset_(-1), len_(-1), disk_id_(-1), request_type_(request_type) {} - /// Hadoop filesystem that contains file_, or set to NULL for local filesystem. + /// Hadoop filesystem that contains file_, or set to nullptr for local filesystem. hdfsFS fs_; /// Path to file being read or written. @@ -364,12 +339,12 @@ class DiskIoMgr { BufferOpts(bool try_cache, int64_t mtime) : try_cache_(try_cache), mtime_(mtime), - client_buffer_(NULL), + client_buffer_(nullptr), client_buffer_len_(-1) {} /// Set options for an uncached read into an IoMgr-allocated buffer. static BufferOpts Uncached() { - return BufferOpts(false, NEVER_CACHE, NULL, -1); + return BufferOpts(false, NEVER_CACHE, nullptr, -1); } /// Set options to read the entire scan range into 'client_buffer'. The length of the @@ -400,7 +375,7 @@ class DiskIoMgr { /// NEVER_CACHE, caching is disabled. const int64_t mtime_; - /// A destination buffer provided by the client, NULL and -1 if no buffer. + /// A destination buffer provided by the client, nullptr and -1 if no buffer. uint8_t* const client_buffer_; const int64_t client_buffer_len_; }; @@ -416,7 +391,7 @@ class DiskIoMgr { virtual ~ScanRange(); /// Resets this scan range object with the scan range description. The scan range - /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is NULL for the + /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the /// local filesystem). The scan range must fall within the file bounds (offset >= 0 /// and offset + len <= file_length). 'disk_id' is the disk queue to add the range /// to. If 'expected_local' is true, a warning is generated if the read did not @@ -424,7 +399,7 @@ class DiskIoMgr { /// see the DiskIoMgr class comment and the BufferOpts comments for details. /// 'meta_data' is an arbitrary client-provided pointer for any auxiliary data. void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id, - bool expected_local, const BufferOpts& buffer_opts, void* meta_data = NULL); + bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr); void* meta_data() const { return meta_data_; } bool try_cache() const { return try_cache_; } @@ -433,7 +408,7 @@ class DiskIoMgr { /// Returns the next buffer for this scan range. buffer is an output parameter. /// This function blocks until a buffer is ready or an error occurred. If this is - /// called when all buffers have been returned, *buffer is set to NULL and Status::OK + /// called when all buffers have been returned, *buffer is set to nullptr and Status::OK /// is returned. /// Only one thread can be in GetNext() at any time. Status GetNext(BufferDescriptor** buffer) WARN_UNUSED_RESULT; @@ -474,7 +449,15 @@ class DiskIoMgr { int64_t MaxReadChunkSize() const; /// Opens the file for this range. This function only modifies state in this range. - Status Open(); + /// If 'use_file_handle_cache' is true and this is a local hdfs file, then this scan + /// range will not maintain an exclusive file handle. It will borrow an hdfs file + /// handle from the file handle cache for each Read(), so Open() does nothing. + /// If 'use_file_handle_cache' is false or this is a remote hdfs file or this is + /// a local OS file, Open() will maintain a file handle on the scan range for + /// exclusive use by this scan range. An exclusive hdfs file handle still comes + /// from the cache, but it is held for the entire duration of a scan range's lifetime. + /// All local OS files are opened using normal OS file APIs. + Status Open(bool use_file_handle_cache) WARN_UNUSED_RESULT; /// Closes the file for this range. This function only modifies state in this range. void Close(); @@ -484,6 +467,12 @@ class DiskIoMgr { Status Read(uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) WARN_UNUSED_RESULT; + /// Get the read statistics from the Hdfs file handle and aggregate them to + /// the DiskIoRequestContext. This clears the statistics on this file handle. + /// It is safe to pass hdfsFile by value, as hdfsFile's underlying type is a + /// pointer. + void GetHdfsStatistics(hdfsFile fh); + /// Reads from the DN cache. On success, sets cached_buffer_ to the DN buffer /// and *read_succeeded to true. /// If the data is not cached, returns ok() and *read_succeeded is set to false. @@ -492,7 +481,7 @@ class DiskIoMgr { /// Pointer to caller specified metadata. This is untouched by the io manager /// and the caller can put whatever auxiliary data in here. - void* meta_data_; + void* meta_data_ = nullptr; /// If true, this scan range is expected to be cached. Note that this might be wrong /// since the block could have been uncached. In that case, the cached path @@ -506,18 +495,31 @@ class DiskIoMgr { /// TODO: we can do more with this bool expected_local_; - DiskIoMgr* io_mgr_; + /// Total number of bytes read remotely. This is necessary to maintain a count of + /// the number of remote scan ranges. Since IO statistics can be collected multiple + /// times for a scan range, it is necessary to keep some state about whether this + /// scan range has already been counted as remote. There is also a requirement to + /// log the number of unexpected remote bytes for a scan range. To solve both + /// requirements, maintain num_remote_bytes_ on the ScanRange and push it to the + /// reader_ once at the close of the scan range. + int64_t num_remote_bytes_; + + DiskIoMgr* io_mgr_ = nullptr; /// Reader/owner of the scan range - DiskIoRequestContext* reader_; + DiskIoRequestContext* reader_ = nullptr; /// File handle either to hdfs or local fs (FILE*) - /// - /// TODO: The pointer to HdfsCachedFileHandle is manually managed and should be - /// replaced by unique_ptr in C++11 + /// The hdfs file handle is only stored here in three cases: + /// 1. The file handle cache is off (max_cached_file_handles == 0). + /// 2. The scan range is using hdfs caching. + /// -OR- + /// 3. The hdfs file is expected to be remote (expected_local_ == false) + /// In each case, the scan range gets a file handle from the file handle cache + /// at Open() and holds it exclusively until Close() is called. union { - FILE* local_file_; - HdfsCachedFileHandle* hdfs_file_; + FILE* local_file_ = nullptr; + HdfsFileHandle* exclusive_hdfs_fh_; }; /// Tagged union that holds a buffer for the cases when there is a buffer allocated @@ -536,7 +538,7 @@ class DiskIoMgr { /// Valid and non-NULL if the external_buffer_tag_ is CACHED_BUFFER, which means /// that a cached read succeeded and all the bytes for the range are in this buffer. - struct hadoopRzBuffer* cached_buffer_; + struct hadoopRzBuffer* cached_buffer_ = nullptr; }; /// Lock protecting fields below. @@ -697,7 +699,7 @@ class DiskIoMgr { /// Returns the next unstarted scan range for this reader. When the range is returned, /// the disk threads in the IoMgr will already have started reading from it. The /// caller is expected to call ScanRange::GetNext on the returned range. - /// If there are no more unstarted ranges, NULL is returned. + /// If there are no more unstarted ranges, nullptr is returned. /// This call is blocking. Status GetNextRange(DiskIoRequestContext* reader, ScanRange** range) WARN_UNUSED_RESULT; @@ -732,6 +734,8 @@ class DiskIoMgr { int64_t bytes_read_dn_cache(DiskIoRequestContext* reader) const; int num_remote_ranges(DiskIoRequestContext* reader) const; int64_t unexpected_remote_bytes(DiskIoRequestContext* reader) const; + int cached_file_handles_hit_count(DiskIoRequestContext* reader) const; + int cached_file_handles_miss_count(DiskIoRequestContext* reader) const; /// Returns the read throughput across all readers. /// TODO: should this be a sliding window? This should report metrics for the @@ -763,16 +767,20 @@ class DiskIoMgr { /// for debugging. bool Validate() const; - /// Given a FS handle, name and last modified time of the file, tries to open that file - /// and return an instance of HdfsCachedFileHandle. In case of an error returns NULL. - HdfsCachedFileHandle* OpenHdfsFile(const hdfsFS& fs, const char* fname, int64_t mtime); + /// Given a FS handle, name and last modified time of the file, gets an HdfsFileHandle + /// from the file handle cache. On success, records statistics about whether this was + /// a cache hit or miss in the `reader` as well as at the system level. In case of an + /// error returns nullptr. + HdfsFileHandle* GetCachedHdfsFileHandle(const hdfsFS& fs, + std::string* fname, int64_t mtime, DiskIoRequestContext *reader); - /// When the file handle is no longer in use by the scan range, return it and try to - /// unbuffer the handle. If unbuffering, closing sockets and dropping buffers in the - /// libhdfs client, is not supported, close the file handle. If the unbuffer operation - /// is supported, put the file handle together with the mtime in the LRU cache for - /// later reuse. - void CacheOrCloseFileHandle(const char* fname, HdfsCachedFileHandle* fid, bool close); + /// Releases a file handle back to the file handle cache when it is no longer in use. + void ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid); + + /// Reopens a file handle by destroying the file handle and getting a fresh + /// file handle from the cache. Returns an error if the file could not be reopened. + Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime, + HdfsFileHandle** fid); /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the buffers if /// 'bytes_to_free' is -1. @@ -823,7 +831,7 @@ class DiskIoMgr { ThreadGroup disk_thread_group_; /// Options object for cached hdfs reads. Set on startup and never modified. - struct hadoopRzOptions* cached_read_options_; + struct hadoopRzOptions* cached_read_options_ = nullptr; /// True if the IoMgr should be torn down. Worker threads watch for this to /// know to terminate. This variable is read/written to by different threads. @@ -876,10 +884,13 @@ class DiskIoMgr { /// round-robin assignment for that case. static AtomicInt32 next_disk_id_; + // Number of file handle cache partitions to use + static const size_t NUM_FILE_HANDLE_CACHE_PARTITIONS = 16; + // Caching structure that maps file names to cached file handles. The cache has an upper // limit of entries defined by FLAGS_max_cached_file_handles. Evicted cached file // handles are closed. - FifoMultimap<std::string, HdfsCachedFileHandle*> file_handle_cache_; + FileHandleCache<NUM_FILE_HANDLE_CACHE_PARTITIONS> file_handle_cache_; /// Returns the index into free_buffers_ for a given buffer size int free_buffers_idx(int64_t buffer_size); @@ -906,7 +917,7 @@ class DiskIoMgr { /// Returns a buffer desc object which can now be used for another reader. void ReturnBufferDesc(BufferDescriptor* desc); - /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be NULL), either + /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be nullptr), either /// freeing it or returning it to 'free_buffers_'. Memory tracking is updated to /// reflect the transfer of ownership from desc->mem_tracker_ to the disk I/O mgr. void FreeBufferMemory(BufferDescriptor* desc); @@ -953,9 +964,9 @@ class DiskIoMgr { void ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader, ScanRange* range); /// Try to allocate the next buffer for the scan range, returning the new buffer - /// if successful. If 'reader' is cancelled, cancels the range and returns NULL. + /// if successful. If 'reader' is cancelled, cancels the range and returns nullptr. /// If there is memory pressure and buffers are already queued, adds the range - /// to the blocked ranges and returns NULL. + /// to the blocked ranges and returns nullptr. BufferDescriptor* TryAllocateNextBufferForRange(DiskQueue* disk_queue, DiskIoRequestContext* reader, ScanRange* range, int64_t buffer_size); }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/tests/custom_cluster/test_hdfs_fd_caching.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_hdfs_fd_caching.py b/tests/custom_cluster/test_hdfs_fd_caching.py index c030c8c..bcf50e6 100644 --- a/tests/custom_cluster/test_hdfs_fd_caching.py +++ b/tests/custom_cluster/test_hdfs_fd_caching.py @@ -58,41 +58,41 @@ class TestHdfsFdCaching(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( - impalad_args="--max_cached_file_handles=5", + impalad_args="--max_cached_file_handles=16", catalogd_args="--load_catalog_in_background=false") def test_scan_does_cache_fd(self, vector): """Tests that an hdfs scan will lead to caching HDFS file descriptors.""" # Maximum number of file handles cached - assert self.max_cached_handles() <= 5 - # One table, one file, one handle + assert self.max_cached_handles() <= 16 + # The table has one file, so there should be one more handle cached after the + # first select. num_handles_before = self.cached_handles() - self.execute_query("select * from cachefd.simple limit 1", vector=vector) + self.execute_query("select * from cachefd.simple", vector=vector) num_handles_after = self.cached_handles() - assert self.max_cached_handles() <= 5 + assert self.max_cached_handles() <= 16 - # Should have at least one more handle cached and not more than three more - # as there are three Impalads. - assert (num_handles_before + 1) <= num_handles_after <= (num_handles_before + 3) + # Should have one more file handle + assert num_handles_after == (num_handles_before + 1) # No open handles if scanning is finished assert self.outstanding_handles() == 0 # No change when reading the table again for x in range(10): - self.execute_query("select * from cachefd.simple limit 1", vector=vector) + self.execute_query("select * from cachefd.simple", vector=vector) + assert self.cached_handles() == num_handles_after + assert self.max_cached_handles() <= 16 + assert self.outstanding_handles() == 0 - assert self.max_cached_handles() <= 5 - assert num_handles_after == self.cached_handles() - assert self.outstanding_handles() == 0 - - # Create more files - self.create_n_files(10) + # Create more files. This means there are more files than the cache size. + # The cache size should still be enforced. + self.create_n_files(100) # Read all the files of the table and make sure no FD leak for x in range(10): self.execute_query("select count(*) from cachefd.simple;", vector=vector) - assert self.max_cached_handles() <= 5 + assert self.max_cached_handles() <= 16 assert self.outstanding_handles() == 0 def cached_handles(self): http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/tests/metadata/test_refresh_partition.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_refresh_partition.py b/tests/metadata/test_refresh_partition.py index a8b5042..187055c 100644 --- a/tests/metadata/test_refresh_partition.py +++ b/tests/metadata/test_refresh_partition.py @@ -144,10 +144,12 @@ class TestRefreshPartition(ImpalaTestSuite): self.run_stmt_in_hive( 'alter table %s drop partition (y=333, z=5309)' % table_name) - # Query the table and check for expected error. + + # Query the table. With file handle caching, this may not produce an error, + # because the file handles are still open in the cache. If the system does + # produce an error, it should be the expected error. try: self.client.execute("select * from %s" % table_name) - assert False, "Query was expected to fail" except ImpalaBeeswaxException as e: assert expected_error in str(e) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/tests/query_test/test_hdfs_fd_caching.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_hdfs_fd_caching.py b/tests/query_test/test_hdfs_fd_caching.py index 6961f3b..fa03b24 100644 --- a/tests/query_test/test_hdfs_fd_caching.py +++ b/tests/query_test/test_hdfs_fd_caching.py @@ -21,74 +21,104 @@ import pytest from tests.common.impala_cluster import ImpalaCluster from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.skip import SkipIfS3, SkipIfADLS +from tests.common.test_vector import ImpalaTestDimension +from subprocess import call +from tests.util.filesystem_utils import FILESYSTEM_PREFIX +# Modifications to test with the file handle cache +MODIFICATION_TYPES=["delete_files", "delete_directory", "move_file", "append"] @SkipIfS3.caching @SkipIfADLS.caching class TestHdfsFdCaching(ImpalaTestSuite): """ - This test suite tests the behavior of HDFS file descriptor caching by evaluating the - metrics exposed by the Impala daemon. + This test suite tests the behavior of HDFS file descriptor caching, including + potential error cases such as file deletes. """ - NUM_ROWS = 10000 - @classmethod def file_format_constraint(cls, v): - return v.get_value('table_format').file_format in ["parquet"] + return v.get_value('table_format').file_format in ["text"] @classmethod def add_test_dimensions(cls): super(TestHdfsFdCaching, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('modification_type',\ + *MODIFICATION_TYPES)) cls.ImpalaTestMatrix.add_constraint(cls.file_format_constraint) @classmethod def get_workload(cls): return 'functional-query' + def setup_ext_table(self, vector, unique_database, new_table_location): + # Use HDFS commands to clone the table's files at the hdfs level + old_table_location = "{0}/test-warehouse/tinytable".format(FILESYSTEM_PREFIX) + call(["hdfs", "dfs", "-mkdir", new_table_location]) + call(["hdfs", "dfs", "-cp", old_table_location + "/*", new_table_location]) - def setup_method(self, method): - self.cleanup_db("cachefd") - self.client.execute("create database cachefd") - self.client.execute("create table cachefd.simple(id int, col1 int, col2 int) " - "stored as parquet") - buf = "insert into cachefd.simple values" - self.client.execute(buf + ", ".join(["({0},{0},{0})".format(x) for x in range(self.NUM_ROWS)])) - - def teardown_method(self, methd): - self.cleanup_db("cachedfd") - + # Create an external table with the new files (tinytable has two string columns) + create_table = "create external table {0}.t1 (a string, b string) "\ + + "row format delimited fields terminated by \',\' location \'{1}\'" + self.client.execute(create_table.format(unique_database, new_table_location)) @pytest.mark.execute_serially - def test_simple_scan(self, vector): - """Tests that in the default configuration, file handle caching is disabled and no - file handles are cached.""" - - num_handles_before = self.cached_handles() - assert 0 == num_handles_before - self.execute_query("select * from cachefd.simple limit 1", vector=vector) - num_handles_after = self.cached_handles() - assert 0 == num_handles_after - assert num_handles_after == num_handles_before - assert 0 == self.outstanding_handles() - - # No change when reading the table again - for x in range(10): - self.execute_query("select * from cachefd.simple limit 1", vector=vector) - - # TODO This assertion fails reliably in the Kudu feature branch build for reasons yet - # unknown, since it seems unrelated to other changes. Once the root cause for the - # failure is known this assertion should be uncommented. - # assert num_handles_after == self.cached_handles() - assert 0 == self.outstanding_handles() - - def cached_handles(self): - return self.get_agg_metric("impala-server.io.mgr.num-cached-file-handles") - - def outstanding_handles(self): - return self.get_agg_metric("impala-server.io.mgr.num-file-handles-outstanding") - - def get_agg_metric(self, key, fun=sum): - cluster = ImpalaCluster() - return fun([s.service.get_metric_value(key) for s - in cluster.impalads]) + def test_file_modifications(self, vector, unique_database): + """Tests file modifications on a file that is cached in the file handle cache.""" + + new_table_location = "{0}/test-warehouse/{1}".format(FILESYSTEM_PREFIX,\ + unique_database) + self.setup_ext_table(vector, unique_database, new_table_location) + + # Query the table (puts file handle in the cache) + count_query = "select count(*) from {0}.t1".format(unique_database) + original_result = self.execute_query_expect_success(self.client, count_query) + assert(original_result.data[0] == '3') + + # Do the modification based on the test settings + modification_type = vector.get_value('modification_type') + if (modification_type == 'delete_files'): + # Delete the data file (not the directory) + call(["hdfs", "dfs", "-rm", "-skipTrash", new_table_location + "/*"]) + elif (modification_type == 'delete_directory'): + # Delete the whole directory (including data file) + call(["hdfs", "dfs", "-rm", "-r", "-skipTrash", new_table_location]) + elif (modification_type == 'move_file'): + # Move the file underneath the directory + call(["hdfs", "dfs", "-mv", new_table_location + "/data.csv", \ + new_table_location + "/data.csv.moved"]) + elif (modification_type == 'append'): + # Append a copy of the hdfs file to itself (duplicating all entries) + local_tmp_location = "/tmp/{0}.data.csv".format(unique_database) + call(["hdfs", "dfs", "-copyToLocal", new_table_location + "/data.csv", \ + local_tmp_location]) + call(["hdfs", "dfs", "-appendToFile", local_tmp_location, \ + new_table_location + "/data.csv"]) + call(["rm", local_tmp_location]) + else: + assert(false) + + # The query might fail, but nothing should crash. + self.execute_query(count_query) + + # Invalidate metadata + invalidate_metadata_sql = "invalidate metadata {0}.t1".format(unique_database) + self.execute_query_expect_success(self.client, invalidate_metadata_sql) + + # Verify that nothing crashes and the query should succeed + new_result = self.execute_query_expect_success(self.client, count_query) + if (modification_type == 'move_file'): + assert(new_result.data[0] == '3') + elif (modification_type == 'delete_files' or \ + modification_type == 'delete_directory'): + assert(new_result.data[0] == '0') + elif (modification_type == 'append'): + assert(new_result.data[0] == '6') + + # Drop table + drop_table_sql = "drop table {0}.t1".format(unique_database) + self.execute_query_expect_success(self.client, drop_table_sql) + + # Cleanup directory (which may already be gone) + call(["hdfs", "dfs", "-rm", "-r", "-skipTrash", new_table_location]) +
