This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit c209b50867d846029b8f149ecbd0187d6eae9455 Author: Eyizoha <[email protected]> AuthorDate: Wed Feb 22 17:59:07 2023 +0800 IMPALA-11904: Data cache support dumping for reloading Data cache mainly includes cache metadata and cache files. The cache files are located on the disk and is responsible for storing cached data content, while the cache metadata is located in the memory and is responsible for indexing to the cache file according to the cache key. Before this patch, if the impalad process exits, the cache metadata will be lost. After the Impalad process restarts, we cannot reuse the cache file even though it is still on the disk, because there is no corresponding cache metadata for index. This patch implements the dump and load functions of the data cache. After enabling the dump&load function with setting 'data_cache_keep_across_restarts=true', when the Impalad process is closed by graceful shutdown (kill -SIGRTMIN $pid), the data cache will collect the cache metadata and dump them to the location where the cache directory is located. When the Impalad process restarts, it will try to load the dumped files on the disk to restore the original cache metadata, so that the existing cache files can be reused without refilling the cache. The cache can be safely dumped during query execution, because before the dump starts, the data cache will be set to read-only to prevent the inconsistency between the metadata dump and the cache file. Note that the dump files will also use disk space. After testing, the size of the dump file is generally not more than 0.5% of the size of all cache files. Testing: - Add DataCacheTest,#SetReadOnly Used to test whether set/revoke read-only takes effect, even when there are writes in progress. - Add DataCacheTest,#DumpAndLoad Used to test whether the original cache contents can be read after a data cache dump and reload. - Add DataCacheTest,#ChangeConfBeforeLoad Used to test whether the original cache contents can be read after the data cache is dumped and the configuration is changed and then reloaded. - Add end-to-end test in test_data_cache.py Perform end-to-end testing in a custom cluster, including executing queries, gracefully restarting, verifying metrics, re-executing the same query and verifying hits/misses. This also includes testing the modification of cache capacity and restart, as well as testing restarts while querie is in progress. Change-Id: Id867f4fc7343898e4906332c3caa40eb57a03101 Reviewed-on: http://gerrit.cloudera.org:8080/19532 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Joe McDonnell <[email protected]> --- CMakeLists.txt | 2 +- be/src/runtime/io/data-cache-test.cc | 132 +++++++++- be/src/runtime/io/data-cache.cc | 438 ++++++++++++++++++++++++++++++-- be/src/runtime/io/data-cache.h | 62 ++++- be/src/runtime/io/disk-io-mgr.cc | 14 + be/src/runtime/io/disk-io-mgr.h | 4 + be/src/scheduling/executor-group.cc | 15 ++ be/src/service/impala-server.cc | 4 + be/src/util/cache/cache-internal.h | 16 ++ be/src/util/cache/cache.h | 8 + be/src/util/cache/lirs-cache.cc | 34 +++ be/src/util/cache/rl-cache.cc | 14 + tests/common/impala_cluster.py | 13 + tests/custom_cluster/test_data_cache.py | 209 +++++++++++---- 14 files changed, 883 insertions(+), 82 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e1a02ddeb..7b86e421d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -198,7 +198,7 @@ function(IMPALA_ADD_THIRDPARTY_LIB NAME HEADER STATIC_LIB SHARED_LIB) endfunction() -find_package(Boost REQUIRED COMPONENTS thread regex filesystem system date_time random locale) +find_package(Boost REQUIRED COMPONENTS thread regex filesystem system date_time random locale serialization) # Mark Boost as a system header to avoid compile warnings. include_directories(SYSTEM ${Boost_INCLUDE_DIRS}) message(STATUS "Boost include dir: " ${Boost_INCLUDE_DIRS}) diff --git a/be/src/runtime/io/data-cache-test.cc b/be/src/runtime/io/data-cache-test.cc index b1540cf59..61e14f89a 100644 --- a/be/src/runtime/io/data-cache-test.cc +++ b/be/src/runtime/io/data-cache-test.cc @@ -72,6 +72,7 @@ DECLARE_int32(max_data_cache_trace_file_size); DECLARE_int32(data_cache_trace_percentage); DECLARE_int32(data_cache_num_async_write_threads); DECLARE_string(data_cache_async_write_buffer_limit); +DECLARE_bool(data_cache_keep_across_restarts); namespace impala { namespace io { @@ -120,9 +121,13 @@ class DataCacheBaseTest : public testing::Test { // per thread; If false, the prefix for filenames is empty // 'expect_misses' : If true, expect there will be cache misses when reading // from the cache + // 'test_readonly' : Only for testing read-only, if true, set data cache + // read-only after create all threads, and check number of + // writes later // void MultiThreadedReadWrite(DataCache* cache, int64_t max_start_offset, - bool use_per_thread_filename, bool expect_misses, double* write_time_ms) { + bool use_per_thread_filename, bool expect_misses, double* write_time_ms, + bool test_readonly = false) { // Barrier to synchronize all threads so no thread will start probing the cache until // all insertions are done. CountingBarrier barrier(NUM_THREADS); @@ -140,6 +145,14 @@ class DataCacheBaseTest : public testing::Test { &barrier, &num_misses[i], &write_times_us[i]), &thread)); threads.emplace_back(std::move(thread)); } + + int64_t num_writes = 0; + if (test_readonly) { + // Set read-only before the writes are complete, and record the total write count of + // the cache after read-only set. + num_writes = cache->SetDataCacheReadOnly(); + } + int cache_misses = 0; int64_t avg_write_time_us = 0; for (int i = 0; i < NUM_THREADS; ++i) { @@ -154,6 +167,13 @@ class DataCacheBaseTest : public testing::Test { ASSERT_EQ(0, cache_misses); } + if (test_readonly) { + // Revoke read-only and check the total write count, which should not have increased + // due to read-only. + ASSERT_EQ(num_writes, cache->RevokeDataCacheReadOnly()); + return; + } + // Verify the backing files don't exceed size limits. ASSERT_OK(cache->CloseFilesAndVerifySizes()); } @@ -869,6 +889,116 @@ TEST_P(DataCacheTest, OutOfWriteBufferLimit) { EXPECT_LT(count, NUM_CACHE_ENTRIES_NO_EVICT); } +TEST_P(DataCacheTest, SetReadOnly) { + const int64_t cache_size = DEFAULT_CACHE_SIZE; + DataCache cache(Substitute("$0:$1", data_cache_dirs()[0], std::to_string(cache_size))); + ASSERT_OK(cache.Init()); + + int64_t max_start_offset = NUM_CACHE_ENTRIES_NO_EVICT; + bool use_per_thread_filename = false; + // Set read-only for the first multi-threaded read-write test, which should expect there + // will be cache misses when reading from the cache. + bool expect_misses = true; + bool test_readonly = true; + double wait_time_ms = 0; + MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename, + expect_misses, &wait_time_ms, test_readonly); + LOG(INFO) << "SetReadOnly, " + << FLAGS_data_cache_eviction_policy << ", " + << (FLAGS_data_cache_num_async_write_threads > 0 ? "Async" : "Sync") + << ", write_time_ms: " << wait_time_ms; + + // For the second test, do not set read-only, and no cache misses should occur. + expect_misses = false; + test_readonly = false; + MultiThreadedReadWrite(&cache, max_start_offset, use_per_thread_filename, + expect_misses, &wait_time_ms, test_readonly); + LOG(INFO) << "SetReadOnly, " + << FLAGS_data_cache_eviction_policy << ", " + << (FLAGS_data_cache_num_async_write_threads > 0 ? "Async" : "Sync") + << ", write_time_ms: " << wait_time_ms; +} + +TEST_P(DataCacheTest, DumpAndLoad) { + FLAGS_data_cache_keep_across_restarts = true; + + const int64_t cache_size = DEFAULT_CACHE_SIZE; + unique_ptr<DataCache> cache = make_unique<DataCache>(Substitute("$0:$1", + data_cache_dirs()[0], std::to_string(cache_size))); + ASSERT_OK(cache->Init()); + + int64_t max_start_offset = NUM_CACHE_ENTRIES_NO_EVICT; + for (int64_t offset = 0; offset < max_start_offset; ++offset) { + ASSERT_TRUE(cache->Store(FNAME, MTIME, offset, test_buffer() + offset, + TEMP_BUFFER_SIZE)); + } + WaitForAsyncWrite(*cache); + + // After completing the normal write, dump the cached data and then create a new data + // cache to load the dump file to initialize. + ASSERT_OK(cache->Dump()); + cache.reset(new DataCache(Substitute("$0:$1", data_cache_dirs()[0], + std::to_string(cache_size)))); + ASSERT_OK(cache->Init()); + + // There is no need to write again, just lookup all entries written before the dump one + // by one, should all hit the cache. + uint8_t buffer[TEMP_BUFFER_SIZE]; + for (int64_t offset = 0; offset < max_start_offset; ++offset) { + memset(buffer, 0, TEMP_BUFFER_SIZE); + int64_t bytes_read = + cache->Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer); + ASSERT_EQ(TEMP_BUFFER_SIZE, bytes_read); + ASSERT_EQ(0, memcmp(buffer, test_buffer() + offset, TEMP_BUFFER_SIZE)); + } +} + +TEST_P(DataCacheTest, ChangeConfBeforeLoad) { + FLAGS_data_cache_keep_across_restarts = true; + + const int64_t cache_size = DEFAULT_CACHE_SIZE; + unique_ptr<DataCache> cache = make_unique<DataCache>(Substitute("$0:$1", + data_cache_dirs()[0], std::to_string(cache_size))); + ASSERT_OK(cache->Init()); + + int64_t max_start_offset = NUM_CACHE_ENTRIES_NO_EVICT; + for (int64_t offset = 0; offset < max_start_offset; ++offset) { + ASSERT_TRUE(cache->Store(FNAME, MTIME, offset, test_buffer() + offset, + TEMP_BUFFER_SIZE)); + } + WaitForAsyncWrite(*cache); + + // After completing the normal write, dump the cached data and change some configs, then + // create a new data cache to load the dump file to initialize. + ASSERT_OK(cache->Dump()); + const int64_t new_cache_size = DEFAULT_CACHE_SIZE / 2; + FLAGS_data_cache_eviction_policy = + FLAGS_data_cache_eviction_policy == "LRU" ? "LIRS" : "LRU"; + cache.reset(new DataCache(Substitute("$0:$1", data_cache_dirs()[0], + std::to_string(new_cache_size)))); + ASSERT_OK(cache->Init()); + + // Since we have reduced the data cache size and the previous writes have already + // reached the capacity limit, some data must have been discarded during the load + // process due to the new capacity limit. Therefore, there should be some misses. + int num_misses = 0; + uint8_t buffer[TEMP_BUFFER_SIZE]; + for (int64_t offset = 0; offset < max_start_offset; ++offset) { + memset(buffer, 0, TEMP_BUFFER_SIZE); + int64_t bytes_read = + cache->Lookup(FNAME, MTIME, offset, TEMP_BUFFER_SIZE, buffer); + if (bytes_read == TEMP_BUFFER_SIZE) { + ASSERT_EQ(0, memcmp(buffer, test_buffer() + offset, TEMP_BUFFER_SIZE)); + } else { + ASSERT_EQ(bytes_read, 0); + ++num_misses; + } + } + + EXPECT_GT(num_misses, 0); + EXPECT_LT(num_misses, NUM_CACHE_ENTRIES_NO_EVICT); +} + } // namespace io } // namespace impala diff --git a/be/src/runtime/io/data-cache.cc b/be/src/runtime/io/data-cache.cc index fa532f165..58845ede9 100644 --- a/be/src/runtime/io/data-cache.cc +++ b/be/src/runtime/io/data-cache.cc @@ -23,8 +23,17 @@ #include <string.h> #include <unistd.h> #include <sstream> +#include <fstream> +#include <unordered_map> +#include <unordered_set> +#include <sys/stat.h> #include <glog/logging.h> +#include <boost/archive/binary_oarchive.hpp> +#include <boost/archive/binary_iarchive.hpp> +#include <boost/serialization/access.hpp> +#include <boost/serialization/vector.hpp> +#include <boost/thread/pthread/shared_mutex.hpp> #include "common/compiler-util.h" #include "exec/kudu/kudu-util.h" @@ -66,6 +75,7 @@ using kudu::Slice; using kudu::WritableFile; using strings::SkipEmpty; using strings::Split; +using boost::shared_mutex; #ifdef NDEBUG #define ENABLE_CHECKSUMMING (false) @@ -119,11 +129,14 @@ DEFINE_string(data_cache_eviction_policy, "LRU", DEFINE_string(data_cache_async_write_buffer_limit, "1GB", "(Experimental) Limit of the total buffer size used by asynchronous store tasks."); +DECLARE_bool(data_cache_keep_across_restarts); + namespace impala { namespace io { static const int64_t PAGE_SIZE = 1L << 12; const char* DataCache::Partition::CACHE_FILE_PREFIX = "impala-cache-file-"; +const char* DataCache::Partition::DUMP_FILE_NAME = "impala-cache-metadata"; const int MAX_FILE_DELETER_QUEUE_SIZE = 500; // This large value for the queue size is harmless because the total size of the entries @@ -180,6 +193,20 @@ class DataCache::CacheFile { return Status::OK(); } + static Status Open(std::string path, bool allow_append, int64_t current_offset, + std::unique_ptr<CacheFile>* cache_file_ptr) { + unique_ptr<CacheFile> cache_file(new CacheFile(path)); + kudu::RWFileOptions opts; + opts.mode = kudu::Env::OpenMode::MUST_EXIST; + KUDU_RETURN_IF_ERROR( + kudu::Env::Default()->NewRWFile(opts, path, &cache_file->file_), + "Failed to open cache file"); + cache_file->allow_append_ = allow_append; + cache_file->current_offset_.Store(current_offset); + *cache_file_ptr = std::move(cache_file); + return Status::OK(); + } + // Close the underlying file so it cannot be read or written to anymore. void Close() { // Explicitly hold the lock in write mode to block all readers. This ensures that @@ -199,6 +226,7 @@ class DataCache::CacheFile { // Close the underlying file and delete it from the filesystem. void DeleteFile() { Close(); + if (readonly_) return; DCHECK(!file_); kudu::Status status = kudu::Env::Default()->DeleteFile(path_); if (!status.ok()) { @@ -252,9 +280,10 @@ class DataCache::CacheFile { bool Write(int64_t offset, const uint8_t* buffer, int64_t buffer_len) { DCHECK_EQ(offset % PAGE_SIZE, 0); DCHECK_LE(offset, current_offset_.Load()); + if (UNLIKELY(readonly_)) return false; // Hold the lock in shared mode to check if 'file_' is not closed already. kudu::shared_lock<rw_spinlock> lock(lock_.get_lock()); - if (UNLIKELY(!file_)) return false; + if (UNLIKELY(!file_ || readonly_)) return false; DCHECK_LE(offset + buffer_len, current_offset_.Load()); kudu::Status status = file_->Write(offset, Slice(buffer, buffer_len)); if (UNLIKELY(!status.ok())) { @@ -268,9 +297,10 @@ class DataCache::CacheFile { void PunchHole(int64_t offset, int64_t hole_size) { DCHECK_EQ(offset % PAGE_SIZE, 0); DCHECK_EQ(hole_size % PAGE_SIZE, 0); + if (UNLIKELY(readonly_)) return; // Hold the lock in shared mode to check if 'file_' is not closed already. kudu::shared_lock<rw_spinlock> lock(lock_.get_lock()); - if (UNLIKELY(!file_)) return; + if (UNLIKELY(!file_ || readonly_)) return; DCHECK_LE(offset + hole_size, current_offset_.Load()); kudu::Status status = file_->PunchHole(offset, hole_size); if (UNLIKELY(!status.ok())) { @@ -279,8 +309,43 @@ class DataCache::CacheFile { } } + Status Flush() { + std::unique_lock<percpu_rwlock> lock(lock_); + // If the file is already closed, nothing to do. + if (!file_) return Status::OK(); + kudu::Status status = file_->Sync(); + if (UNLIKELY(!status.ok())) { + return Status(status.message().ToString()); + } + return Status::OK(); + } + + void SetReadOnly() { + readonly_ = true; + std::unique_lock<percpu_rwlock> lock(lock_); + } + + void RevokeReadOnly() { + std::unique_lock<percpu_rwlock> lock(lock_); + readonly_ = false; + } + + int64_t mtime() { + // If the file is already closed, don't care about its mtime. + if (!file_) return -1; + struct stat buf; + stat(path_.c_str(), &buf); + return buf.st_mtime; + } + const string& path() const { return path_; } + bool closed() const { return file_ == nullptr; } + + bool allow_append() const { return allow_append_; } + + int64_t current_offset() const { return current_offset_.Load(); } + private: /// Full path of the backing file in the local storage. const string path_; @@ -291,15 +356,19 @@ class DataCache::CacheFile { /// True iff it's okay to append to this backing file. bool allow_append_ = true; + /// The backing file cannot be written or punched hole if this is true. + bool readonly_ = false; + /// The current offset in the file to append to on next insert. AtomicInt64 current_offset_; /// This is a reader-writer lock used for synchronization with the deleter thread. - /// It is taken in write mode in Close() and shared mode everywhere else. It's expected - /// that all places except for Close() check that 'file_' is not NULL with the lock held - /// in shared mode while Close() ensures that no thread is holding the lock in shared - /// mode so it's safe to close the file. The file can no longer be read, written or hole - /// punched after it has been closed. The only operation allowed is to deletion. + /// It is taken in write mode in Close()/SetReadOnly()/RevokeReadOnly() and shared mode + /// everywhere else. It's expected that all places except for + /// Close()/SetReadOnly()/RevokeReadOnly() check that 'file_' is not NULL with the lock + /// held in shared mode while Close() ensures that no thread is holding the lock in + /// shared mode so it's safe to close the file. The file can no longer be read, written + /// or hole punched after it has been closed. The only operation allowed is to deletion. percpu_rwlock lock_; /// C'tor of CacheFile to be called by Create() only. @@ -386,6 +455,111 @@ struct DataCache::CacheKey { faststring key_; }; +/// The helper class to dump/load cache metadata. DumpData is the intermediary between +/// cache metadata and dump file. Data cache partition can push data into DumpData and +/// serialize it to disk or deserialize a DumpData from disk to load the metadata. +class DataCache::Partition::DumpData { + public: + + void serialize(std::ofstream& os) { + boost::archive::binary_oarchive oa(os); + oa << *this; + } + + void deserialize(std::ifstream& is) { + boost::archive::binary_iarchive ia(is); + ia >> *this; + } + + private: + friend class Partition; + friend class boost::serialization::access; + + /// The dump struct of CacheFile. + struct CacheFileData { + CacheFileData() = default; + + CacheFileData(string path, bool allow_append, int64_t current_offset, int64_t mtime) + : path(path), allow_append(allow_append), current_offset(current_offset), + mtime(mtime) { } + + template<class Archive> + void serialize(Archive& ar, const unsigned int version) { + ar & path; + ar & allow_append; + ar & current_offset; + ar & mtime; + } + + /// Corresponds to 'CacheFile::path_'. + string path; + + /// Corresponds to 'CacheFile::allow_append_'. + bool allow_append; + + /// Corresponds to 'CacheFile::current_offset_'. + int64_t current_offset; + + /// The last modification time of the cache file, for check when load to prevent + /// accidental modification of the cache file. + int64_t mtime; + }; + + /// The dump struct of a pair of CacheKey and CacheEntry. + struct CacheKeyEntryData { + CacheKeyEntryData() = default; + + CacheKeyEntryData(string key, int64_t index, int64_t offset, int64_t len, + uint64_t checksum) + : key(key), index(index), offset(offset), len(len), checksum(checksum) { } + + template<class Archive> + void serialize(Archive& ar, const unsigned int version) { + ar & key; + ar & index; + ar & offset; + ar & len; + ar & checksum; + } + + /// Corresponds to CacheKey. + string key; + + /// Corresponds to CacheEntry, but uses 'index' instead of the 'file_' to point to the + /// cache file. When we dump the cache metadata, the 'file_' is meaningless because + /// the cache file objects cannot still be at the same address after reloading. So + /// during the dump, we use the corresponding file's index in the 'cache_files_data' + /// instead of file pointer, so that when reloading, the correct file pointer can be + /// find based on the index. + int64_t index; + int64_t offset; + int64_t len; + uint64_t checksum; + }; + + template<class Archive> + void serialize(Archive& ar, const unsigned int version) { + ar & cache_files_data; + ar & meta_cache_data; + } + + /// Corresponds to 'Partition::cache_files_', but does not include closed files, because + /// closed files can no longer be read or written, and the underlying file should have + /// been deleted already. + vector<CacheFileData> cache_files_data; + + /// Corresponds to 'Partition::meta_cache_', but does not include entries that the cache + /// file is closed, because these entries can no longer be hit and do not need to be + /// dumped. + vector<CacheKeyEntryData> meta_cache_data; + + /// The mapping between the cache file pointer and the index of 'cache_files_data' to + /// convert the file pointer to the index when the cache entry is dumped. It is built + /// when cache files are dumped, only used when 'meta_cache_' is dumped and will not be + /// dumped to disk. + std::unordered_map<CacheFile*, int64_t> file_indexs; +}; + /// The class to abstract store behavior, holds a temporary buffer copied from the source /// buffer until store complete. class DataCache::StoreTask { @@ -473,14 +647,19 @@ Status DataCache::Partition::CreateCacheFile() { return Status::OK(); } -Status DataCache::Partition::DeleteExistingFiles() const { +Status DataCache::Partition::DeleteUntrackedFiles() const { DCHECK(!trace_replay_); + std::unordered_set<string> tracked_file_paths; + for (const auto& file : cache_files_) { + tracked_file_paths.insert(file->path()); + } vector<string> entries; RETURN_IF_ERROR(FileSystemUtil::Directory::GetEntryNames(path_, &entries, 0, FileSystemUtil::Directory::EntryType::DIR_ENTRY_REG)); for (const string& entry : entries) { - if (entry.find(CACHE_FILE_PREFIX) == 0) { + if (entry.find(CACHE_FILE_PREFIX) == 0 || entry == DUMP_FILE_NAME) { const string file_path = JoinPathSegments(path_, entry); + if (tracked_file_paths.find(file_path) != tracked_file_paths.end()) continue; KUDU_RETURN_IF_ERROR(kudu::Env::Default()->DeleteFile(file_path), Substitute("Failed to delete old cache file $0", file_path)); LOG(INFO) << Substitute("Deleted old cache file $0", file_path); @@ -504,20 +683,6 @@ Status DataCache::Partition::Init() { } RETURN_IF_ERROR(FileSystemUtil::VerifyIsDirectory(path_)); - // Delete all existing backing files left over from previous runs. - RETURN_IF_ERROR(DeleteExistingFiles()); - - // Check if there is enough space available at this point in time. - uint64_t available_bytes; - RETURN_IF_ERROR(FileSystemUtil::GetSpaceAvailable(path_, &available_bytes)); - if (available_bytes < capacity_) { - const string& err = Substitute("Insufficient space for $0. Required $1. Only $2 is " - "available", path_, PrettyPrinter::PrintBytes(capacity_), - PrettyPrinter::PrintBytes(available_bytes)); - LOG(ERROR) << err; - return Status(err); - } - // Make sure hole punching is supported for the caching directory. RETURN_IF_ERROR(FileSystemUtil::CheckHolePunch(path_)); @@ -549,9 +714,32 @@ Status DataCache::Partition::Init() { // Create metrics for this partition InitMetrics(); - // Create a backing file for the partition. - RETURN_IF_ERROR(CreateCacheFile()); - oldest_opened_file_ = 0; + // Only if loading is not enabled, or if loading is enabled but fails, do we need to + // create new cache file. + if (!(FLAGS_data_cache_keep_across_restarts && Load().ok())) { + // Create a backing file for the partition. + RETURN_IF_ERROR(CreateCacheFile()); + oldest_opened_file_ = 0; + } + + // Delete all cache files that are not listed in cache_files_ (including dumped metadata + // file). These files were left over from previous runs and are now no longer needed. + RETURN_IF_ERROR(DeleteUntrackedFiles()); + + // Check if there is enough space available at this point in time. + int64_t used_bytes = ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES->GetValue(); + uint64_t available_bytes; + RETURN_IF_ERROR(FileSystemUtil::GetSpaceAvailable(path_, &available_bytes)); + if (used_bytes + available_bytes < capacity_) { + const string& err = Substitute("Insufficient space for $0. Total required $1 and " + "cache already used $2. Still required $3, but only $4 is available", + path_, PrettyPrinter::PrintBytes(capacity_), PrettyPrinter::PrintBytes(used_bytes), + PrettyPrinter::PrintBytes(capacity_ - used_bytes), + PrettyPrinter::PrintBytes(available_bytes)); + LOG(ERROR) << err; + return Status(err); + } + return Status::OK(); } @@ -837,6 +1025,7 @@ bool DataCache::Partition::Store(const CacheKey& cache_key, const uint8_t* buffe void DataCache::Partition::DeleteOldFiles() { std::unique_lock<SpinLock> partition_lock(lock_); + if (UNLIKELY(files_readonly_)) return; DCHECK_GE(oldest_opened_file_, 0); int target = cache_files_.size() - FLAGS_data_cache_max_opened_files; while (oldest_opened_file_ < target) { @@ -844,6 +1033,156 @@ void DataCache::Partition::DeleteOldFiles() { } } +void DataCache::Partition::SetCacheFilesReadOnly() { + std::unique_lock<SpinLock> partition_lock(lock_); + for (auto& file : cache_files_) file->SetReadOnly(); + files_readonly_ = true; +} + +void DataCache::Partition::RevokeCacheFilesReadOnly() { + std::unique_lock<SpinLock> partition_lock(lock_); + for (auto& file : cache_files_) file->RevokeReadOnly(); + files_readonly_ = false; +} + +Status DataCache::Partition::Dump() { + std::unique_lock<SpinLock> partition_lock(lock_); + LOG(INFO) << Substitute("Partition $0 start dumping.", index_); + DumpData dump_data; + RETURN_IF_ERROR(DumpCacheFiles(dump_data)); + RETURN_IF_ERROR(DumpMetaCache(dump_data)); + std::ofstream file; + file.open(JoinPathSegments(path_, DUMP_FILE_NAME), std::ofstream::binary); + if (!file) return Status(Substitute("Failed to open $0", DUMP_FILE_NAME)); + try { + dump_data.serialize(file); + } catch (boost::archive::archive_exception& e) { + LOG(ERROR) << Substitute("Partition $0 failed to serialize dump file.", index_) + << e.what(); + return Status("Failed to serialize dump file."); + } + file.close(); + LOG(INFO) << Substitute("Partition $0 dump successfully.", index_); + return Status::OK(); +} + +Status DataCache::Partition::Load() { + std::ifstream file; + file.open(JoinPathSegments(path_, DUMP_FILE_NAME), std::ofstream::binary); + if (!file) return Status::Expected(Substitute("Failed to open $0", DUMP_FILE_NAME)); + LOG(INFO) << Substitute("Partition $0 start loading.", index_); + DumpData dump_data; + try { + dump_data.deserialize(file); + } catch (boost::archive::archive_exception& e) { + LOG(ERROR) << Substitute("Partition $0 failed to deserialize dump file.", index_) + << e.what(); + return Status("Failed to deserialize dump file."); + } + file.close(); + + Status status = LoadCacheFiles(dump_data); + if (!status.ok()) { + cache_files_.clear(); + return status; + } + + status = LoadMetaCache(dump_data); + if (!status.ok()) { + cache_files_.clear(); + meta_cache_.reset(NewCache(GetCacheEvictionPolicy(FLAGS_data_cache_eviction_policy), + capacity_, path_)); + // We have already successfully initialized meta_cache_ once before calling Load(), so + // there is no reason for it to fail this time. + ABORT_IF_ERROR(meta_cache_->Init()); + return status; + } + + LOG(INFO) << Substitute("Partition $0 load successfully.", index_); + return Status::OK(); +} + +Status DataCache::Partition::DumpCacheFiles(DumpData& dump_data) { + lock_.DCheckLocked(); + int64_t index = 0; + for (const auto& file : cache_files_) { + // There is no need to dump closed file, it can no longer be read or written or even + // deleted. + if (file->closed()) continue; + + // Just to be safe, flush the file. + RETURN_IF_ERROR(file->Flush()); + + dump_data.cache_files_data.emplace_back(file->path(), file->allow_append(), + file->current_offset(), file->mtime()); + auto result = dump_data.file_indexs.emplace(file.get(), index++); + DCHECK(result.second); + } + return Status::OK(); +} + +Status DataCache::Partition::LoadCacheFiles(const DumpData& dump_data) { + lock_.DCheckLocked(); + cache_files_.clear(); + for (const DumpData::CacheFileData& file_data : dump_data.cache_files_data) { + unique_ptr<CacheFile> cache_file; + RETURN_IF_ERROR(CacheFile::Open(file_data.path, file_data.allow_append, + file_data.current_offset, &cache_file)); + if (file_data.mtime != cache_file->mtime()) { + return Status("The actual mtime of the cache file does not match the dumped data."); + } + cache_files_.emplace_back(std::move(cache_file)); + } + if (cache_files_.empty()) { + return Status("No cache files loaded."); + } + oldest_opened_file_ = 0; + return Status::OK(); +} + +Status DataCache::Partition::DumpMetaCache(DumpData& dump_data) { + lock_.DCheckLocked(); + DCHECK(dump_data.cache_files_data.size() == dump_data.file_indexs.size()); + + // Walk through the meta cache, taking all keys and entries and dumping them one by one. + vector<Cache::UniqueHandle> handles = meta_cache_->Dump(); + for (const auto& handle : handles) { + CacheEntry entry(meta_cache_->Value(handle)); + auto index_iter = dump_data.file_indexs.find(entry.file()); + // If the entry's cache file is not in the list, we do not need to dump it. + if (index_iter == dump_data.file_indexs.end()) continue; + dump_data.meta_cache_data.emplace_back(meta_cache_->Key(handle).ToString(), + index_iter->second, entry.offset(), entry.len(), entry.checksum()); + } + return Status::OK(); +} + +Status DataCache::Partition::LoadMetaCache(const DumpData& dump_data) { + lock_.DCheckLocked(); + for (const DumpData::CacheKeyEntryData& key_entry : dump_data.meta_cache_data) { + // Usually this shouldn't happen, and if it did, the dump file may be corrupted and + // loading should be aborted. + if (UNLIKELY(key_entry.index >= cache_files_.size())) { + return Status("Failed to load meta cache, the dump file may be corrupted."); + } + + // According to the index, find the corresponding file pointer, build entry and insert + // into the cache like normal store. If the cache capacity has been changed, the entry + // may be evicted, but that's no matter. + CacheFile* file = cache_files_[key_entry.index].get(); + CacheEntry entry(file, key_entry.offset, key_entry.len, key_entry.checksum); + const int64_t charge_len = BitUtil::RoundUp(entry.len(), PAGE_SIZE); + Cache::UniquePendingHandle pending_handle( + meta_cache_->Allocate(Slice(key_entry.key), sizeof(CacheEntry), charge_len)); + if (UNLIKELY(pending_handle.get() == nullptr)) continue; + memcpy(meta_cache_->MutableValue(&pending_handle), &entry, sizeof(CacheEntry)); + meta_cache_->Insert(std::move(pending_handle), this); + ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES->Increment(charge_len); + ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_NUM_ENTRIES->Increment(1); + } + return Status::OK(); +} + void DataCache::Partition::EvictedEntry(Slice key, Slice value) { if (closed_) return; if (UNLIKELY(trace_replay_)) return; @@ -1013,6 +1352,10 @@ int64_t DataCache::Lookup(const string& filename, int64_t mtime, int64_t offset, bool DataCache::Store(const string& filename, int64_t mtime, int64_t offset, const uint8_t* buffer, int64_t buffer_len) { DCHECK(!partitions_.empty()); + + // Check early that the cache is read-only. + if (UNLIKELY(readonly_)) return false; + // Bail out early for uncacheable ranges or invalid requests. if (mtime < 0 || offset < 0 || buffer_len < 0) { VLOG(3) << Substitute("Skipping insertion of invalid entry $0 mtime: $1 offset: $2 " @@ -1037,6 +1380,43 @@ Status DataCache::CloseFilesAndVerifySizes() { return Status::OK(); } +int64_t DataCache::SetDataCacheReadOnly() { + if (readonly_) return ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_NUM_WRITES->GetValue(); + + // First set the read-only flag to reject new writes. The exclusive lock is then + // acquired, because the shared lock will be acquired before any writes begin, so it + // blocks here until all ongoing writes have completed and all shared lock has been + // released. This ensures that there will be no more change to the cache after the + // function returns. + readonly_ = true; + std::unique_lock<shared_mutex> lock(readonly_lock_); + for (auto& partition : partitions_) { + partition->SetCacheFilesReadOnly(); + } + + return ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_NUM_WRITES->GetValue(); +} + +int64_t DataCache::RevokeDataCacheReadOnly() { + if (!readonly_) return ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_NUM_WRITES->GetValue(); + + std::unique_lock<shared_mutex> lock(readonly_lock_); + for (auto& partition : partitions_) { + partition->RevokeCacheFilesReadOnly(); + } + readonly_ = false; + + return ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_NUM_WRITES->GetValue(); +} + +Status DataCache::Dump() { + SetDataCacheReadOnly(); + for (auto& partition : partitions_) { + RETURN_IF_ERROR(partition->Dump()); + } + return Status::OK(); +} + void DataCache::DeleteOldFiles(uint32_t thread_id, int partition_idx) { DCHECK_LT(partition_idx, partitions_.size()); partitions_[partition_idx]->DeleteOldFiles(); @@ -1094,6 +1474,12 @@ void DataCache::HandleStoreTask(uint32_t thread_id, const StoreTaskHandle& task) bool DataCache::StoreInternal(const CacheKey& key, const uint8_t* buffer, int64_t buffer_len) { + // Here, a shared lock is acquired with try_to_lock, while a unique lock is acquired in + // SetDataCacheReadOnly(). Therefore, when setting the read-only status, the lock + // acquisition here can quickly fail, then return. + kudu::shared_lock<shared_mutex> lock(readonly_lock_, std::try_to_lock); + if (UNLIKELY(!lock.owns_lock() || readonly_)) return false; + int idx = key.Hash() % partitions_.size(); bool start_reclaim; bool stored = partitions_[idx]->Store(key, buffer, buffer_len, &start_reclaim); diff --git a/be/src/runtime/io/data-cache.h b/be/src/runtime/io/data-cache.h index 1f5cd870b..ce49cfc3e 100644 --- a/be/src/runtime/io/data-cache.h +++ b/be/src/runtime/io/data-cache.h @@ -23,6 +23,7 @@ #include <unordered_map> #include <unordered_set> #include <gtest/gtest_prod.h> +#include <boost/thread/pthread/shared_mutex.hpp> #include "common/status.h" #include "util/cache/cache.h" @@ -190,6 +191,21 @@ class DataCache { /// partitions before verifying their sizes. Used by test only. Status CloseFilesAndVerifySizes(); + /// Set the data cache to read-only. After this function is called, all Store() calls + /// return false immediately, and for ongoing Store(), this function blocks until they + /// are complete. + /// Return current number of writes after read-only set for testing. + int64_t SetDataCacheReadOnly(); + + /// Revoke the data cache read-only. + /// Return current number of writes after read-only revoked for testing. + int64_t RevokeDataCacheReadOnly(); + + /// Dump the metadata of each cache partition to the same directory on disk as the cache + /// files, and the dump file can be reloaded back when data cache init. Note that the + /// data cache will set to read-only mode before the data is dumped. + Status Dump(); + private: friend class DataCacheBaseTest; friend class DataCacheTest; @@ -220,7 +236,7 @@ class DataCache { /// - removes any stale backing file in this partition /// - checks if there is enough storage space /// - checks if the filesystem supports hole punching - /// - creates an empty backing file. + /// - try to load dump file or creates an empty backing file. /// /// Returns error if there is any of the above steps fails. Returns OK otherwise. Status Init(); @@ -264,6 +280,15 @@ class DataCache { /// --data_cache_max_opened_files. void DeleteOldFiles(); + /// Set all cache files of this partition to read-only. + void SetCacheFilesReadOnly(); + + /// Revoke all cache files of this partition read-only. + void RevokeCacheFilesReadOnly(); + + /// Dump the 'meta_cache_' and 'cache_files_' of this partition as dump file. + Status Dump(); + private: friend class DataCacheBaseTest; friend class DataCacheTest; @@ -271,6 +296,8 @@ class DataCache { FRIEND_TEST(DataCacheTest, NonRotationalDisk); FRIEND_TEST(DataCacheTest, InvalidDisk); + class DumpData; + /// Index of this partition. This is used for naming metrics or other items that /// need separate values for each partition. It does not impact cache behavior. int32_t index_; @@ -299,9 +326,16 @@ class DataCache { /// The prefix of the names of the cache backing files. static const char* CACHE_FILE_PREFIX; + /// The file name of the data cache dump file. + static const char* DUMP_FILE_NAME; + /// Protects the following fields. SpinLock lock_; + /// If it is true, no file deletion should take place. Set before the files are set to + /// read only. + bool files_readonly_ = false; + /// Index into 'cache_files_' of the oldest opened file. int oldest_opened_file_ = -1; @@ -344,9 +378,10 @@ class DataCache { /// error on failure. Status CreateCacheFile(); - /// Utility function to delete cache files left over from previous runs of Impala. + /// Utility function to delete cache files that are not tracked by cache_files_ and + /// may have been left over from previous runs of Impala. /// Returns error on failure. - Status DeleteExistingFiles() const; + Status DeleteUntrackedFiles() const; /// Utility function for computing the checksum of 'buffer' with length 'buffer_len'. static uint64_t Checksum(const uint8_t* buffer, int64_t buffer_len); @@ -384,6 +419,21 @@ class DataCache { static bool VerifyChecksum(const std::string& ops_name, const CacheEntry& entry, const uint8_t* buffer, int64_t buffer_len); + /// Load the 'cache_files_' and 'meta_cache_' of this partition from dump file. + Status Load(); + + /// Dump the 'cache_files_' of this partition into 'dump_data'. + Status DumpCacheFiles(DumpData& dump_data); + + /// Load the 'cache_files_' for this partition from 'dump_data'. + Status LoadCacheFiles(const DumpData& dump_data); + + /// Dump the 'meta_cache_' of this partition into 'dump_data'. + Status DumpMetaCache(DumpData& dump_data); + + /// Load the 'meta_cache_' for this partition from 'dump_data'. + Status LoadMetaCache(const DumpData& dump_data); + void Trace(const trace::EventType& status, const DataCache::CacheKey& key, int64_t lookup_len, int64_t entry_len); }; @@ -398,6 +448,12 @@ class DataCache { /// operations, and no filesystem operations are required. bool trace_replay_; + /// This lock keep SetDataCacheReadOnly() blocked until all ongoing Store() complete. + boost::shared_mutex readonly_lock_; + + /// Store() will return false immediately if this is true. + bool readonly_ = false; + /// The set of all cache partitions. std::vector<std::unique_ptr<Partition>> partitions_; diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc index fae86e948..94a4a5555 100644 --- a/be/src/runtime/io/disk-io-mgr.cc +++ b/be/src/runtime/io/disk-io-mgr.cc @@ -81,6 +81,13 @@ DEFINE_int32(data_cache_num_async_write_threads, 0, "--data_cache_async_write_buffer_limit. If this's 0, then write will be " "synchronous."); +DEFINE_bool(data_cache_keep_across_restarts, false, + "(Experimental) If this is true, the data cache metadata is dumped to the same " + "directory as the cached files on disk when the process gracefully shutdowns. The " + "metadata will be reloaded the next time the process starts, so that the previous " + "cached data can be reused as if the process had never shutdown. If loading fails, " + "it will proceed with regular initialization."); + // Rotational disks should have 1 thread per disk to minimize seeks. Non-rotational // don't have this penalty and benefit from multiple concurrent IO requests. static const int THREADS_PER_ROTATIONAL_DISK = 1; @@ -731,6 +738,13 @@ Status DiskIoMgr::AllocateBuffersForRange( min_buffer_size_, max_buffer_size_); } +Status DiskIoMgr::DumpDataCache() { + if (FLAGS_data_cache_keep_across_restarts && remote_data_cache_) { + return remote_data_cache_->Dump(); + } + return Status::Expected("No cache dump is required."); +} + vector<int64_t> DiskIoMgr::ChooseBufferSizes(int64_t scan_range_len, int64_t max_bytes) { DCHECK_GE(max_bytes, min_buffer_size_); vector<int64_t> buffer_sizes; diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h index 7cf7662e3..5e3fd46ea 100644 --- a/be/src/runtime/io/disk-io-mgr.h +++ b/be/src/runtime/io/disk-io-mgr.h @@ -418,6 +418,10 @@ class DiskIoMgr : public CacheLineAligned { /// is something invalid about the scan range. Status ValidateScanRange(ScanRange* range) WARN_UNUSED_RESULT; + /// Try to dump the data of remote data cache to disk, so it could be loaded when + /// impalad restart. + Status DumpDataCache(); + DataCache* remote_data_cache() { return remote_data_cache_.get(); } private: diff --git a/be/src/scheduling/executor-group.cc b/be/src/scheduling/executor-group.cc index 5b711dc37..ba313f8cf 100644 --- a/be/src/scheduling/executor-group.cc +++ b/be/src/scheduling/executor-group.cc @@ -96,6 +96,21 @@ void ExecutorGroup::AddExecutor(const BackendDescriptorPB& be_desc) { executor_ip_hash_ring_.AddNode(be_desc.ip_address()); } be_descs.push_back(be_desc); + + // When computing ScanRange assignment, if there are multiple backends on a host, a + // round-robin approach is taken. That is, which backend a ScanRange assigned to the + // host will eventually be assigned to depends on the order of these backends in the + // vector, and the corresponding code is located in + // Scheduler::AssignmentCtx::SelectExecutorOnHost(). Since backend's remote data cache + // have data dump-load ability, so it is better to keep the order of backends before and + // after restarting consistent (here using port sorting), to ensure that ScanRange + // assignments do not change after certain backends or even entire clusters restart, to + // improve data cache hit rate. + auto cmp = [](const BackendDescriptorPB& a, const BackendDescriptorPB& b) { + return a.address().port() < b.address().port(); + }; + std::sort(be_descs.begin(), be_descs.end(), cmp); + executor_ip_map_[be_desc.address().hostname()] = be_desc.ip_address(); DCHECK(be_desc.admit_mem_limit() > 0) << "Admit memory limit must be set for backends"; diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 1e4c7a836..3e73c0029 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -78,6 +78,7 @@ #include "runtime/timestamp-value.h" #include "runtime/timestamp-value.inline.h" #include "runtime/tmp-file-mgr.h" +#include "runtime/io/disk-io-mgr.h" #include "scheduling/admission-control-service.h" #include "scheduling/admission-controller.h" #include "service/cancellation-work.h" @@ -3247,6 +3248,9 @@ Status ImpalaServer::StartShutdown( curr_deadline = shutdown_deadline_.Load(); } + // Dump the data cache before shutdown. + discard_result(ExecEnv::GetInstance()->disk_io_mgr()->DumpDataCache()); + while (shutting_down_.Load() == 0) { if (!shutting_down_.CompareAndSwap(0, now)) continue; unique_ptr<Thread> t; diff --git a/be/src/util/cache/cache-internal.h b/be/src/util/cache/cache-internal.h index e46a97afe..8432d321e 100644 --- a/be/src/util/cache/cache-internal.h +++ b/be/src/util/cache/cache-internal.h @@ -205,6 +205,7 @@ class CacheShard { virtual void Release(HandleBase* handle) = 0; virtual void Erase(const Slice& key, uint32_t hash) = 0; virtual size_t Invalidate(const Cache::InvalidationControl& ctl) = 0; + virtual vector<HandleBase*> Dump() = 0; }; // Function to build a cache shard using the given eviction algorithm. @@ -271,6 +272,10 @@ class ShardedCache : public Cache { shards_[Shard(hash)]->Erase(key, hash); } + Slice Key(const UniqueHandle& handle) const override { + return reinterpret_cast<HandleBase*>(handle.get())->key(); + } + Slice Value(const UniqueHandle& handle) const override { return reinterpret_cast<HandleBase*>(handle.get())->value(); } @@ -304,6 +309,17 @@ class ShardedCache : public Cache { return invalidated_count; } + vector<UniqueHandle> Dump() override { + vector<UniqueHandle> handles; + for (auto& shard : shards_) { + for (HandleBase* handle : shard->Dump()) { + handles.emplace_back( + reinterpret_cast<Cache::Handle*>(handle), Cache::HandleDeleter(this)); + } + } + return handles; + } + protected: void Release(Handle* handle) override { HandleBase* h = reinterpret_cast<HandleBase*>(handle); diff --git a/be/src/util/cache/cache.h b/be/src/util/cache/cache.h index aac172930..dabbb22e3 100644 --- a/be/src/util/cache/cache.h +++ b/be/src/util/cache/cache.h @@ -183,6 +183,10 @@ class Cache { // to it have been released. virtual void Erase(const Slice& key) = 0; + // Return the key encapsulated in a raw handle returned by a successful + // Lookup(). + virtual Slice Key(const UniqueHandle& handle) const = 0; + // Return the value encapsulated in a raw handle returned by a successful // Lookup(). virtual Slice Value(const UniqueHandle& handle) const = 0; @@ -279,6 +283,10 @@ class Cache { // See the in-line documentation for IterationFunc for more details. virtual size_t Invalidate(const InvalidationControl& ctl) = 0; + // Walk through all valid entries in the cache and push their handles into the return + // vector. + virtual std::vector<UniqueHandle> Dump() = 0; + // Functor to define a criterion on a cache entry's validity. Upon call // of Cache::Invalidate() method, if the functor returns 'false' for the // specified key and value, the cache evicts the entry, otherwise the entry diff --git a/be/src/util/cache/lirs-cache.cc b/be/src/util/cache/lirs-cache.cc index 9a83d7df5..20f62db59 100644 --- a/be/src/util/cache/lirs-cache.cc +++ b/be/src/util/cache/lirs-cache.cc @@ -330,6 +330,7 @@ class LIRSCacheShard : public CacheShard { void Release(HandleBase* handle) override; void Erase(const Slice& key, uint32_t hash) override; size_t Invalidate(const Cache::InvalidationControl& ctl) override; + vector<HandleBase*> Dump() override; private: @@ -1097,6 +1098,39 @@ size_t LIRSCacheShard::Invalidate(const Cache::InvalidationControl& ctl) { return 0; } +vector<HandleBase*> LIRSCacheShard::Dump() { + DCHECK(initialized_); + std::lock_guard<MutexType> l(mutex_); + + // For LIRS cache we only collect resident entries (i.e. PROTECTED/UNPROTECTED entries), + // and ignore entries that are not resident (i.e. TOMBSTONE entries). + vector<HandleBase*> handles; + + for (LIRSHandle& h : recency_list_) { + // First walk through 'recency_list_', only collecting PROTECTED entries, ignoring + // the UNPROTECTED/TOMBSTONE entries. UNPROTECTED entries will be collected later from + // the unprotected_tombstone_list_. + if (h.state() == PROTECTED) { + h.get_reference(); + handles.push_back(&h); + } + } + + if (unprotected_list_front_ != nullptr) { + DCHECK(unprotected_list_front_->unprotected_tombstone_list_hook_.is_linked()); + // From 'unprotected_list_front_' to the end of 'unprotected_tombstone_list_' are all + // UNPROTECTED entries, collecting them all. + auto iter = unprotected_tombstone_list_.iterator_to(*unprotected_list_front_); + while (iter != unprotected_tombstone_list_.end()) { + iter->get_reference(); + handles.push_back(&*iter); + ++iter; + } + } + + return handles; +} + } // end anonymous namespace template<> diff --git a/be/src/util/cache/rl-cache.cc b/be/src/util/cache/rl-cache.cc index 30cfe4e5b..5f2497172 100644 --- a/be/src/util/cache/rl-cache.cc +++ b/be/src/util/cache/rl-cache.cc @@ -93,6 +93,7 @@ class RLCacheShard : public CacheShard { void Release(HandleBase* handle) override; void Erase(const Slice& key, uint32_t hash) override; size_t Invalidate(const Cache::InvalidationControl& ctl) override; + vector<HandleBase*> Dump() override; private: void RL_Remove(RLHandle* e); @@ -419,6 +420,19 @@ size_t RLCacheShard<policy>::Invalidate(const Cache::InvalidationControl& ctl) { return invalid_entry_count; } +template<Cache::EvictionPolicy policy> +vector<HandleBase*> RLCacheShard<policy>::Dump() { + std::lock_guard<decltype(mutex_)> l(mutex_); + vector<HandleBase*> handles; + RLHandle* h = rl_.next; + while (h != &rl_) { + h->refs.fetch_add(1, std::memory_order_relaxed); + handles.push_back(h); + h = h->next; + } + return handles; +} + } // end anonymous namespace template<> diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py index 60de1f80d..38849f8bc 100644 --- a/tests/common/impala_cluster.py +++ b/tests/common/impala_cluster.py @@ -428,6 +428,19 @@ class Process(object): self.kill(signal) self.wait_for_exit() + def modify_argument(self, argument, new_value): + """Modify the 'argument' in start args with new_value. + If no 'argument' in start args, add it. + If new_value is None, add or remove 'argument'.""" + for i in range(1, len(self.cmd)): + if self.cmd[i].split('=')[0] == argument: + if new_value is None: + del self.cmd[i] + else: + self.cmd[i] = (argument + '=' + new_value) + return + self.cmd.append(argument if new_value is None else (argument + '=' + new_value)) + # Base class for all Impala processes class BaseImpalaProcess(Process): diff --git a/tests/custom_cluster/test_data_cache.py b/tests/custom_cluster/test_data_cache.py index 3d874692d..e57577acf 100644 --- a/tests/custom_cluster/test_data_cache.py +++ b/tests/custom_cluster/test_data_cache.py @@ -16,6 +16,9 @@ # under the License. from __future__ import absolute_import, division, print_function +from signal import SIGRTMIN +from time import sleep + import pytest from tests.common.custom_cluster_test_suite import CustomClusterTestSuite @@ -31,7 +34,7 @@ class TestDataCache(CustomClusterTestSuite): on the scheduler's behavior and number of HDFS blocks. """ @classmethod - def get_workload(self): + def get_workload(cls): return 'functional-query' @classmethod @@ -41,15 +44,21 @@ class TestDataCache(CustomClusterTestSuite): super(TestDataCache, cls).setup_class() def get_impalad_args(eviction_policy, high_write_concurrency=True, - force_single_shard=True): + force_single_shard=True, keep_across_restarts=False): impalad_args = ["--always_use_data_cache=true"] if (high_write_concurrency): impalad_args.append("--data_cache_write_concurrency=64") if (force_single_shard): impalad_args.append("--cache_force_single_shard") + if (keep_across_restarts): + impalad_args.append("--data_cache_keep_across_restarts=true") + impalad_args.append("--shutdown_grace_period_s=1") impalad_args.append("--data_cache_eviction_policy={0}".format(eviction_policy)) return " ".join(impalad_args) + def get_data_cache_metric(self, suffix): + return self.get_metric('impala-server.io-mgr.remote-data-cache-' + suffix) + CACHE_START_ARGS = "--data_cache_dir=/tmp --data_cache_size=500MB" def __test_data_cache_deterministic(self, vector, unique_database): @@ -58,19 +67,17 @@ class TestDataCache(CustomClusterTestSuite): a single node to make it easier to verify the runtime profile. Also enables higher write concurrency and uses a single shard to avoid non-determinism. """ - opened_file_handles_metric = 'impala-server.io.mgr.cached-file-handles-miss-count' self.run_test_case('QueryTest/data-cache', vector, unique_database) - assert self.get_metric('impala-server.io-mgr.remote-data-cache-dropped-bytes') >= 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-dropped-entries') >= 0 - assert \ - self.get_metric('impala-server.io-mgr.remote-data-cache-instant-evictions') >= 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-bytes') > 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-count') > 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-bytes') > 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-count') > 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-total-bytes') > 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-num-entries') > 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-num-writes') > 0 + assert self.get_data_cache_metric('dropped-bytes') >= 0 + assert self.get_data_cache_metric('dropped-entries') >= 0 + assert self.get_data_cache_metric('instant-evictions') >= 0 + assert self.get_data_cache_metric('hit-bytes') > 0 + assert self.get_data_cache_metric('hit-count') > 0 + assert self.get_data_cache_metric('miss-bytes') > 0 + assert self.get_data_cache_metric('miss-count') > 0 + assert self.get_data_cache_metric('total-bytes') > 0 + assert self.get_data_cache_metric('num-entries') > 0 + assert self.get_data_cache_metric('num-writes') > 0 # Expect all cache hits results in no opened files. opened_file_handles_metric = 'impala-server.io.mgr.cached-file-handles-miss-count' @@ -107,18 +114,18 @@ class TestDataCache(CustomClusterTestSuite): QUERY = "select * from tpch_parquet.lineitem" # Do a first run to warm up the cache. Expect no hits. self.execute_query(QUERY) - assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-bytes') == 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-count') == 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-bytes') > 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-count') > 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-total-bytes') > 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-num-entries') > 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-num-writes') > 0 + assert self.get_data_cache_metric('hit-bytes') == 0 + assert self.get_data_cache_metric('hit-count') == 0 + assert self.get_data_cache_metric('miss-bytes') > 0 + assert self.get_data_cache_metric('miss-count') > 0 + assert self.get_data_cache_metric('total-bytes') > 0 + assert self.get_data_cache_metric('num-entries') > 0 + assert self.get_data_cache_metric('num-writes') > 0 # Do a second run. Expect some hits. self.execute_query(QUERY) - assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-bytes') > 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-count') > 0 + assert self.get_data_cache_metric('hit-bytes') > 0 + assert self.get_data_cache_metric('hit-count') > 0 @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( @@ -138,13 +145,13 @@ class TestDataCache(CustomClusterTestSuite): def __test_data_cache_disablement(self, vector): # Verifies that the cache metrics are all zero. - assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-bytes') == 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-hit-count') == 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-bytes') == 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-count') == 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-total-bytes') == 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-num-entries') == 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-num-writes') == 0 + assert self.get_data_cache_metric('hit-bytes') == 0 + assert self.get_data_cache_metric('hit-count') == 0 + assert self.get_data_cache_metric('miss-bytes') == 0 + assert self.get_data_cache_metric('miss-count') == 0 + assert self.get_data_cache_metric('total-bytes') == 0 + assert self.get_data_cache_metric('num-entries') == 0 + assert self.get_data_cache_metric('num-writes') == 0 # Runs a query with the cache disabled and then enabled against multiple file formats. # Verifies that the metrics stay at zero when the cache is disabled. @@ -153,16 +160,11 @@ class TestDataCache(CustomClusterTestSuite): for file_format in ['text_gzip', 'parquet', 'avro', 'seq', 'rc']: QUERY = "select * from functional_{0}.alltypes".format(file_format) self.execute_query(QUERY, vector.get_value('exec_option')) - assert disable_cache ==\ - (self.get_metric('impala-server.io-mgr.remote-data-cache-miss-bytes') == 0) - assert disable_cache ==\ - (self.get_metric('impala-server.io-mgr.remote-data-cache-miss-count') == 0) - assert disable_cache ==\ - (self.get_metric('impala-server.io-mgr.remote-data-cache-total-bytes') == 0) - assert disable_cache ==\ - (self.get_metric('impala-server.io-mgr.remote-data-cache-num-entries') == 0) - assert disable_cache ==\ - (self.get_metric('impala-server.io-mgr.remote-data-cache-num-writes') == 0) + assert disable_cache == (self.get_data_cache_metric('miss-bytes') == 0) + assert disable_cache == (self.get_data_cache_metric('miss-count') == 0) + assert disable_cache == (self.get_data_cache_metric('total-bytes') == 0) + assert disable_cache == (self.get_data_cache_metric('num-entries') == 0) + assert disable_cache == (self.get_data_cache_metric('num-writes') == 0) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( @@ -192,23 +194,128 @@ class TestDataCache(CustomClusterTestSuite): # 8MB cache inserts to be instantly evicted. QUERY = "select count(*) from tpch.lineitem" self.execute_query(QUERY) - assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-bytes') > 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-miss-count') > 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-total-bytes') >= 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-num-entries') >= 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-num-writes') >= 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-instant-evictions') > 0 + assert self.get_data_cache_metric('miss-bytes') > 0 + assert self.get_data_cache_metric('miss-count') > 0 + assert self.get_data_cache_metric('total-bytes') >= 0 + assert self.get_data_cache_metric('num-entries') >= 0 + assert self.get_data_cache_metric('num-writes') >= 0 + assert self.get_data_cache_metric('instant-evictions') > 0 # Run the query multiple times and verify that none of the counters go negative instant_evictions_before = \ - self.get_metric('impala-server.io-mgr.remote-data-cache-instant-evictions') + self.get_data_cache_metric('instant-evictions') for i in range(10): self.execute_query(QUERY) instant_evictions_after = \ - self.get_metric('impala-server.io-mgr.remote-data-cache-instant-evictions') + self.get_data_cache_metric('instant-evictions') assert instant_evictions_after - instant_evictions_before > 0 # All the counters remain positive - assert self.get_metric('impala-server.io-mgr.remote-data-cache-num-entries') >= 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-num-writes') >= 0 - assert self.get_metric('impala-server.io-mgr.remote-data-cache-total-bytes') >= 0 + assert self.get_data_cache_metric('num-entries') >= 0 + assert self.get_data_cache_metric('num-writes') >= 0 + assert self.get_data_cache_metric('total-bytes') >= 0 + + def __test_data_cache_keep_across_restarts(self, vector, test_reduce_size=False): + QUERY = "select * from tpch_parquet.lineitem" + # Execute a query, record the total bytes and the number of entries of cache before + # cache dump. + self.execute_query(QUERY) + assert self.get_data_cache_metric('hit-bytes') == 0 + assert self.get_data_cache_metric('hit-count') == 0 + total_bytes = self.get_data_cache_metric('total-bytes') + num_entries = self.get_data_cache_metric('num-entries') + + # Do graceful restart and, if necessary, reduce the cache size by 1/5. + impalad = self.cluster.impalads[0] + impalad.kill_and_wait_for_exit(SIGRTMIN) + new_size = 4 * total_bytes // 5 + if test_reduce_size: + impalad.modify_argument('-data_cache', '/tmp/impala-datacache-0:' + str(new_size)) + impalad.start() + impalad.service.wait_for_num_known_live_backends(1) + + # After the restart, we expect the cache to have the same total bytes + # and number of entries as before the restart, and if the cache size is reduced, + # the metrics should be reduced accordingly. + if test_reduce_size: + assert self.get_data_cache_metric('total-bytes') <= new_size + assert self.get_data_cache_metric('num-entries') < num_entries + else: + assert self.get_data_cache_metric('total-bytes') == total_bytes + assert self.get_data_cache_metric('num-entries') == num_entries + + # Reconnect to the service and execute the query, expecting some cache hits. + self.client.connect() + self.execute_query(QUERY) + assert self.get_data_cache_metric('hit-bytes') > 0 + assert self.get_data_cache_metric('hit-count') > 0 + if test_reduce_size: + assert self.get_data_cache_metric('miss-bytes') > 0 + assert self.get_data_cache_metric('miss-count') > 0 + else: + assert self.get_data_cache_metric('miss-bytes') == 0 + assert self.get_data_cache_metric('miss-count') == 0 + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=get_impalad_args("LRU", keep_across_restarts=True), + start_args=CACHE_START_ARGS, cluster_size=1) + def test_data_cache_keep_across_restarts_lru(self, vector): + self.__test_data_cache_keep_across_restarts(vector) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=get_impalad_args("LIRS", keep_across_restarts=True), + start_args=CACHE_START_ARGS, cluster_size=1) + def test_data_cache_keep_across_restarts_lirs(self, vector): + self.__test_data_cache_keep_across_restarts(vector) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=get_impalad_args("LRU", keep_across_restarts=True), + start_args=CACHE_START_ARGS, cluster_size=1) + def test_data_cache_reduce_size_restarts_lru(self, vector): + self.__test_data_cache_keep_across_restarts(vector, test_reduce_size=True) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=get_impalad_args("LIRS", keep_across_restarts=True), + start_args=CACHE_START_ARGS, cluster_size=1) + def test_data_cache_reduce_size_restarts_lirs(self, vector): + self.__test_data_cache_keep_across_restarts(vector, test_reduce_size=True) + + def __test_data_cache_readonly(self, vector): + QUERY = "select * from tpch_parquet.lineitem" + # Execute the query asynchronously, wait a short while, and do gracefully shutdown + # immediately to test the race between cache writes and setting cache read-only. + handle = self.execute_query_async(QUERY) + sleep(1) + impalad = self.cluster.impalads[0] + impalad.kill(SIGRTMIN) + self.client.fetch(QUERY, handle) + self.client.close_query(handle) + impalad.wait_for_exit() + impalad.start() + impalad.service.wait_for_num_known_live_backends(1) + + # We hope that in this case, the cache is still properly dumped and loaded, + # and then the same query is executed to expect some cache hits. + self.assert_impalad_log_contains('INFO', 'Partition 0 load successfully.') + self.client.connect() + self.execute_query(QUERY) + assert self.get_data_cache_metric('hit-bytes') > 0 + assert self.get_data_cache_metric('hit-count') > 0 + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=get_impalad_args("LRU", keep_across_restarts=True), + start_args=CACHE_START_ARGS, cluster_size=1) + def test_data_cache_readonly_lru(self, vector): + self.__test_data_cache_readonly(vector) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=get_impalad_args("LIRS", keep_across_restarts=True), + start_args=CACHE_START_ARGS, cluster_size=1) + def test_data_cache_readonly_lirs(self, vector): + self.__test_data_cache_readonly(vector)
