IMPALA-3202,IMPALA-2079: rework scratch file I/O Refactor BufferedBlockMgr/TmpFileMgr to push more I/O logic into TmpFileMgr, in anticipation of it being shared with BufferPool. TmpFileMgr now handles: * Scratch space allocation and recycling * Read and write I/O
The interface is also greatly changed so that it is built around Write() and Read() calls, abstracting away the details of temporary file allocation from clients. This means the TmpFileMgr::File class can be hidden from clients. Write error recovery: Also implement write error recovery in TmpFileMgr. If an error occurs while writing to scratch and we have multiple scratch directories, we will try one of the other directories before cancelling the query. File-level blacklisting is used to prevent excessive repeated attempts to resize a scratch file during a single query. Device-level blacklisting is not implemented because it is problematic to permanently take a scratch directory out of use. To reduce the number of error paths, all I/O errors are now handled asynchronously. Previously errors creating or extending the file were returned synchronously from WriteUnpinnedBlock(). This required modifying DiskIoMgr to create the file if not present when opened. Also set the default max_errors value in the thrift definition file, so that it is in effect for backend tests. Future Work: * Support for recycling variable-length scratch file ranges. I omitted this to avoid making the patch even large. Testing: Updated BufferedBlockMgr unit test to reflect changes in behaviour: * Scratch space is no longer permanently associated with a block, and is remapped every time a new block is written to disk . * Files are now blacklisted - updated existing tests and enable the disable blacklisting test. Added some basic testing of recycling of scratch file ranges in the TmpFileMgr unit test. I also manually tested the code in two ways. First by removing permissions for /tmp/impala-scratch and ensuring that a spilling query fails cleanly. Second, by creating a tiny ramdisk (16M) and running with two scratch directories: one on /tmp and one on the tiny ramdisk. When spilling, an out of space error is encountered for the tiny ramdisk and impala spills the remaining data (72M) to /tmp. Change-Id: I8c9c587df006d2f09d72dd636adafbd295fcdc17 Reviewed-on: http://gerrit.cloudera.org:8080/5141 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/95ed4434 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/95ed4434 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/95ed4434 Branch: refs/heads/master Commit: 95ed4434f2f446e214934f7dc251b843c1d6b0a6 Parents: 6b90aa3 Author: Tim Armstrong <[email protected]> Authored: Fri Sep 4 10:54:11 2015 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Jan 5 02:26:24 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/buffered-block-mgr-test.cc | 159 +++--- be/src/runtime/buffered-block-mgr.cc | 225 +++------ be/src/runtime/buffered-block-mgr.h | 90 ++-- be/src/runtime/disk-io-mgr-test.cc | 14 +- be/src/runtime/disk-io-mgr.cc | 42 +- be/src/runtime/disk-io-mgr.h | 19 +- be/src/runtime/exec-env.cc | 8 +- be/src/runtime/exec-env.h | 2 +- be/src/runtime/query-state.cc | 2 - be/src/runtime/tmp-file-mgr-internal.h | 93 ++++ be/src/runtime/tmp-file-mgr-test.cc | 322 ++++++++---- be/src/runtime/tmp-file-mgr.cc | 504 ++++++++++++++----- be/src/runtime/tmp-file-mgr.h | 430 ++++++++++------ be/src/util/disk-info.cc | 1 - be/src/util/disk-info.h | 24 +- be/src/util/filesystem-util.cc | 11 - be/src/util/filesystem-util.h | 3 - be/src/util/mem-range.h | 47 ++ common/thrift/ImpalaInternalService.thrift | 2 +- .../functional-query/queries/QueryTest/set.test | 8 +- tests/custom_cluster/test_scratch_disk.py | 72 +-- 21 files changed, 1331 insertions(+), 747 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/buffered-block-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc index 1828ff8..9b616f5 100644 --- a/be/src/runtime/buffered-block-mgr-test.cc +++ b/be/src/runtime/buffered-block-mgr-test.cc @@ -15,13 +15,14 @@ // specific language governing permissions and limitations // under the License. -#include <gutil/strings/substitute.h> -#include <sys/stat.h> #include <boost/bind.hpp> #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/filesystem.hpp> +#include <boost/regex.hpp> #include <boost/scoped_ptr.hpp> #include <boost/thread/thread.hpp> +#include <gutil/strings/substitute.h> +#include <sys/stat.h> #include "codegen/llvm-codegen.h" #include "common/init.h" @@ -37,6 +38,7 @@ #include "testutil/gtest-util.h" #include "util/cpu-info.h" #include "util/disk-info.h" +#include "util/error-util.h" #include "util/filesystem-util.h" #include "util/promise.h" #include "util/test-info.h" @@ -49,6 +51,7 @@ using boost::filesystem::directory_iterator; using boost::filesystem::remove; +using boost::regex; // Note: This is the default scratch dir created by impala. // FLAGS_scratch_dirs + TmpFileMgr::TMP_SUB_DIR_NAME. @@ -100,7 +103,7 @@ class BufferedBlockMgrTest : public ::testing::Test { created_tmp_dirs_.push_back(dir); } test_env_->InitTmpFileMgr(tmp_dirs, false); - EXPECT_EQ(num_dirs, test_env_->tmp_file_mgr()->num_active_tmp_devices()); + EXPECT_EQ(num_dirs, test_env_->tmp_file_mgr()->NumActiveTmpDevices()); return tmp_dirs; } @@ -254,9 +257,7 @@ class BufferedBlockMgrTest : public ::testing::Test { } static bool AllWritesComplete(BufferedBlockMgr* block_mgr) { - RuntimeProfile::Counter* writes_outstanding = - block_mgr->profile()->GetCounter("BlockWritesOutstanding"); - return writes_outstanding->value() == 0; + return block_mgr->GetNumWritesOutstanding() == 0; } static bool AllWritesComplete(const vector<BufferedBlockMgr*>& block_mgrs) { @@ -266,13 +267,12 @@ class BufferedBlockMgrTest : public ::testing::Test { return true; } - // Delete the temporary file backing a block - all subsequent writes to the file - // should fail. Expects backing file has already been allocated. - static void DeleteBackingFile(BufferedBlockMgr::Block* block) { - const string& path = block->TmpFilePath(); - ASSERT_GT(path.size(), 0); - ASSERT_TRUE(remove(path)); - LOG(INFO) << "Injected fault by deleting file " << path; + // Remove permissions for the temporary file at 'path' - all subsequent writes + // to the file should fail. Expects backing file has already been allocated. + static void DisableBackingFile(const string& path) { + EXPECT_GT(path.size(), 0); + EXPECT_EQ(0, chmod(path.c_str(), 0)); + LOG(INFO) << "Injected fault by removing file permissions " << path; } // Check that the file backing the block has dir as a prefix of its path. @@ -910,9 +910,11 @@ void BufferedBlockMgrTest::TestRuntimeStateTeardown( UnpinBlocks(blocks); vector<BufferedBlockMgr::Block*> more_blocks; AllocateBlocks(block_mgr.get(), client, max_num_buffers, &more_blocks); + + const string& tmp_file_path = blocks[0]->TmpFilePath(); DeleteBlocks(more_blocks); PinBlocks(blocks); - DeleteBackingFile(blocks[0]); + DisableBackingFile(tmp_file_path); } // Unpin will initiate writes. If the write error propagates fast enough, some Unpin() @@ -968,14 +970,15 @@ TEST_F(BufferedBlockMgrTest, WriteCompleteWithCancelledRuntimeState) { DeleteBlocks(blocks); } -// Clear scratch directory. Return # of files deleted. -static int clear_scratch_dir() { +// Remove write permissions on scratch files. Return # of scratch files. +static int remove_scratch_perms() { int num_files = 0; directory_iterator dir_it(SCRATCH_DIR); for (; dir_it != directory_iterator(); ++dir_it) { ++num_files; - remove_all(dir_it->path()); + chmod(dir_it->path().c_str(), 0); } + return num_files; } @@ -997,7 +1000,7 @@ TEST_F(BufferedBlockMgrTest, WriteError) { // Repin the blocks PinBlocks(blocks); // Remove the backing storage so that future writes will fail - int num_files = clear_scratch_dir(); + int num_files = remove_scratch_perms(); ASSERT_GT(num_files, 0); UnpinBlocks(blocks, true); WaitForWrites(block_mgr); @@ -1024,23 +1027,25 @@ TEST_F(BufferedBlockMgrTest, TmpFileAllocateError) { ASSERT_OK(blocks[0]->Unpin()); WaitForWrites(block_mgr); // Remove temporary files - subsequent operations will fail. - int num_files = clear_scratch_dir(); - ASSERT_GT(num_files, 0); - // Current implementation will fail here because it tries to expand the tmp file - // immediately. This behavior is not contractual but we want to know if it changes - // accidentally. - Status status = blocks[1]->Unpin(); - ASSERT_FALSE(status.ok()); + int num_files = remove_scratch_perms(); + ASSERT_TRUE(num_files > 0); + // Current implementation will not fail here until it attempts to write the file. + // This behavior is not contractual but we want to know if it changes accidentally. + ASSERT_OK(blocks[1]->Unpin()); + + // Write failure should cancel query + WaitForWrites(block_mgr); + ASSERT_TRUE(block_mgr->IsCancelled()); DeleteBlocks(blocks); TearDownMgrs(); } // Test that the block manager is able to blacklist a temporary device correctly after a -// write error. We should not allocate more blocks on that device, but existing blocks -// on the device will remain in use. -/// Disabled because blacklisting was disabled as workaround for IMPALA-2305. -TEST_F(BufferedBlockMgrTest, DISABLED_WriteErrorBlacklist) { +// write error. The query that encountered the write error should not allocate more +// blocks on that device, but existing blocks on the device will remain in use and future +// queries will use the device. +TEST_F(BufferedBlockMgrTest, WriteErrorBlacklist) { // Set up two buffered block managers with two temporary dirs. vector<string> tmp_dirs = InitMultipleTmpDirs(2); // Simulate two concurrent queries. @@ -1074,50 +1079,71 @@ TEST_F(BufferedBlockMgrTest, DISABLED_WriteErrorBlacklist) { // Delete one file from first scratch dir for first block manager. BufferedBlockMgr::Block* error_block = FindBlockForDir(blocks[error_mgr], error_dir); ASSERT_TRUE(error_block != NULL) << "Expected a tmp file in dir " << error_dir; + const string& error_file_path = error_block->TmpFilePath(); PinBlocks(all_blocks); - DeleteBackingFile(error_block); - UnpinBlocks(all_blocks); // Should succeed since tmp file space was already allocated. + DisableBackingFile(error_file_path); + UnpinBlocks(all_blocks); // Should succeed since writes occur asynchronously WaitForWrites(block_mgrs); - ASSERT_TRUE(block_mgrs[error_mgr]->IsCancelled()); + // Both block managers have a usable tmp directory so should still be usable. + ASSERT_FALSE(block_mgrs[error_mgr]->IsCancelled()); ASSERT_FALSE(block_mgrs[no_error_mgr]->IsCancelled()); - // Temporary device with error should no longer be active. + // Temporary device with error should still be active. vector<TmpFileMgr::DeviceId> active_tmp_devices = - test_env_->tmp_file_mgr()->active_tmp_devices(); - ASSERT_EQ(tmp_dirs.size() - 1, active_tmp_devices.size()); + test_env_->tmp_file_mgr()->ActiveTmpDevices(); + ASSERT_EQ(tmp_dirs.size(), active_tmp_devices.size()); for (int i = 0; i < active_tmp_devices.size(); ++i) { const string& device_path = test_env_->tmp_file_mgr()->GetTmpDirPath(active_tmp_devices[i]); ASSERT_EQ(string::npos, error_dir.find(device_path)); } - // The second block manager should continue using allocated scratch space, since it - // didn't encounter a write error itself. In future this could change but for now it is - // the intended behaviour. + + // The error block manager should only allocate from the device that had no error. + // The non-error block manager should continue using both devices, since it didn't + // encounter a write error itself. + vector<BufferedBlockMgr::Block*> error_new_blocks; + AllocateBlocks( + block_mgrs[error_mgr], clients[error_mgr], blocks_per_mgr, &error_new_blocks); + UnpinBlocks(error_new_blocks); + WaitForWrites(block_mgrs); + EXPECT_TRUE(FindBlockForDir(error_new_blocks, good_dir) != NULL); + EXPECT_TRUE(FindBlockForDir(error_new_blocks, error_dir) == NULL); + for (int i = 0; i < error_new_blocks.size(); ++i) { + LOG(INFO) << "Newly created block backed by file " + << error_new_blocks[i]->TmpFilePath(); + EXPECT_TRUE(BlockInDir(error_new_blocks[i], good_dir)); + } + DeleteBlocks(error_new_blocks); + PinBlocks(blocks[no_error_mgr]); UnpinBlocks(blocks[no_error_mgr]); - ASSERT_TRUE(FindBlockForDir(blocks[no_error_mgr], good_dir) != NULL); - ASSERT_TRUE(FindBlockForDir(blocks[no_error_mgr], error_dir) != NULL); - // The second block manager should avoid using bad directory for new blocks. + WaitForWrites(block_mgrs); + EXPECT_TRUE(FindBlockForDir(blocks[no_error_mgr], good_dir) != NULL); + EXPECT_TRUE(FindBlockForDir(blocks[no_error_mgr], error_dir) != NULL); + + // The second block manager should use the bad directory for new blocks since + // blacklisting is per-manager, not global. vector<BufferedBlockMgr::Block*> no_error_new_blocks; AllocateBlocks(block_mgrs[no_error_mgr], clients[no_error_mgr], blocks_per_mgr, &no_error_new_blocks); UnpinBlocks(no_error_new_blocks); - for (int i = 0; i < no_error_new_blocks.size(); ++i) { - LOG(INFO) << "Newly created block backed by file " - << no_error_new_blocks[i]->TmpFilePath(); - ASSERT_TRUE(BlockInDir(no_error_new_blocks[i], good_dir)); - } - // A new block manager should only use the good dir for backing storage. + WaitForWrites(block_mgrs); + EXPECT_TRUE(FindBlockForDir(no_error_new_blocks, good_dir) != NULL); + EXPECT_TRUE(FindBlockForDir(no_error_new_blocks, error_dir) != NULL); + DeleteBlocks(no_error_new_blocks); + + // A new block manager should use the both dirs for backing storage. BufferedBlockMgr::Client* new_client; BufferedBlockMgr* new_block_mgr = CreateMgrAndClient(9999, blocks_per_mgr, block_size_, 0, false, &new_client); vector<BufferedBlockMgr::Block*> new_mgr_blocks; AllocateBlocks(new_block_mgr, new_client, blocks_per_mgr, &new_mgr_blocks); UnpinBlocks(new_mgr_blocks); - for (int i = 0; i < blocks_per_mgr; ++i) { - LOG(INFO) << "New manager Block " << i << " backed by file " - << new_mgr_blocks[i]->TmpFilePath(); - ASSERT_TRUE(BlockInDir(new_mgr_blocks[i], good_dir)); - } + WaitForWrites(block_mgrs); + EXPECT_TRUE(FindBlockForDir(new_mgr_blocks, good_dir) != NULL); + EXPECT_TRUE(FindBlockForDir(new_mgr_blocks, error_dir) != NULL); + DeleteBlocks(new_mgr_blocks); + + DeleteBlocks(all_blocks); } // Check that allocation error resulting from removal of directory results in blocks @@ -1151,7 +1177,7 @@ TEST_F(BufferedBlockMgrTest, AllocationErrorHandling) { // use the good dir. UnpinBlocks(blocks[0]); // Directories remain on active list even when they experience errors. - ASSERT_EQ(2, test_env_->tmp_file_mgr()->num_active_tmp_devices()); + ASSERT_EQ(2, test_env_->tmp_file_mgr()->NumActiveTmpDevices()); // Blocks should not be written to bad dir even if it remains non-writable. UnpinBlocks(blocks[1]); // All writes should succeed. @@ -1165,18 +1191,39 @@ TEST_F(BufferedBlockMgrTest, AllocationErrorHandling) { TEST_F(BufferedBlockMgrTest, NoDirsAllocationError) { vector<string> tmp_dirs = InitMultipleTmpDirs(2); int max_num_buffers = 2; + RuntimeState* runtime_state; BufferedBlockMgr::Client* client; - BufferedBlockMgr* block_mgr = - CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client); + BufferedBlockMgr* block_mgr = CreateMgrAndClient( + 0, max_num_buffers, block_size_, 0, false, &client, &runtime_state); vector<BufferedBlockMgr::Block*> blocks; AllocateBlocks(block_mgr, client, max_num_buffers, &blocks); for (int i = 0; i < tmp_dirs.size(); ++i) { const string& tmp_scratch_subdir = tmp_dirs[i] + SCRATCH_SUFFIX; chmod(tmp_scratch_subdir.c_str(), 0); } + ErrorLogMap error_log; + runtime_state->GetErrors(&error_log); + ASSERT_TRUE(error_log.empty()); for (int i = 0; i < blocks.size(); ++i) { - ASSERT_FALSE(blocks[i]->Unpin().ok()); + // Writes won't fail until the actual I/O is attempted. + ASSERT_OK(blocks[i]->Unpin()); } + + LOG(INFO) << "Waiting for writes."; + // Write failure should cancel query. + WaitForWrites(block_mgr); + LOG(INFO) << "writes done."; + ASSERT_TRUE(block_mgr->IsCancelled()); + runtime_state->GetErrors(&error_log); + ASSERT_FALSE(error_log.empty()); + stringstream error_string; + PrintErrorMap(&error_string, error_log); + LOG(INFO) << "Errors: " << error_string.str(); + ASSERT_NE( + string::npos, error_string.str().find("No usable scratch files: space could " + "not be allocated in any of the configured " + "scratch directories (--scratch_dirs)")) + << error_string.str(); DeleteBlocks(blocks); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/buffered-block-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc index 0c3d25f..d4e14a2 100644 --- a/be/src/runtime/buffered-block-mgr.cc +++ b/be/src/runtime/buffered-block-mgr.cc @@ -29,12 +29,9 @@ #include <gutil/strings/substitute.h> -DEFINE_bool(disk_spill_encryption, false, "Set this to encrypt and perform an integrity " - "check on all data spilled to disk during a query"); - #include "common/names.h" -using namespace strings; // for Substitute +using namespace strings; // for Substitute namespace impala { @@ -132,11 +129,8 @@ BufferedBlockMgr::Block::Block(BufferedBlockMgr* block_mgr) : buffer_desc_(NULL), block_mgr_(block_mgr), client_(NULL), - write_range_(NULL), - tmp_file_(NULL), valid_data_len_(0), - num_rows_(0) { -} + num_rows_(0) {} Status BufferedBlockMgr::Block::Pin(bool* pinned, Block* release_block, bool unpin) { return block_mgr_->PinBlock(this, pinned, release_block, unpin); @@ -185,8 +179,8 @@ bool BufferedBlockMgr::Block::Validate() const { } string BufferedBlockMgr::Block::TmpFilePath() const { - if (tmp_file_ == NULL) return ""; - return tmp_file_->path(); + if (write_handle_ == NULL) return ""; + return write_handle_->TmpFilePath(); } string BufferedBlockMgr::Block::DebugString() const { @@ -200,6 +194,9 @@ string BufferedBlockMgr::Block::DebugString() const { << " Pinned: " << is_pinned_ << endl << " Write Issued: " << in_write_ << endl << " Client Local: " << client_local_ << endl; + if (write_handle_ != NULL) { + ss << " Write handle: " << write_handle_->DebugString() << endl; + } if (client_ != NULL) ss << " Client: " << client_->DebugString(); return ss.str(); } @@ -208,7 +205,7 @@ BufferedBlockMgr::BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr int64_t block_size, int64_t scratch_limit) : max_block_size_(block_size), // Keep two writes in flight per scratch disk so the disks can stay busy. - block_write_threshold_(tmp_file_mgr->num_active_tmp_devices() * 2), + block_write_threshold_(tmp_file_mgr->NumActiveTmpDevices() * 2), disable_spill_(state->query_ctx().disable_spilling || block_write_threshold_ == 0 || scratch_limit == 0), query_id_(state->query_id()), @@ -217,7 +214,6 @@ BufferedBlockMgr::BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr total_pinned_buffers_(0), non_local_outstanding_writes_(0), tmp_file_group_(NULL), - io_mgr_(state->io_mgr()), is_cancelled_(false), writes_issued_(0), debug_write_delay_ms_(0) {} @@ -356,7 +352,7 @@ bool BufferedBlockMgr::ConsumeMemory(Client* client, int64_t size) { // If we either couldn't acquire enough buffers or WriteUnpinnedBlocks() failed, undo // the reservation. if (buffers_acquired != buffers_needed || !status.ok()) { - if (!status.ok()) { + if (!status.ok() && !status.IsCancelled()) { VLOG_QUERY << "Query: " << query_id_ << " write unpinned buffers failed."; client->state_->LogError(status.msg()); } @@ -388,8 +384,6 @@ void BufferedBlockMgr::Cancel() { if (is_cancelled_) return; is_cancelled_ = true; } - // Cancel the underlying io mgr to unblock any waiting threads. - io_mgr_->CancelContext(io_request_context_); } bool BufferedBlockMgr::IsCancelled() { @@ -548,14 +542,13 @@ BufferedBlockMgr::~BufferedBlockMgr() { // Do not do that with 'static_block_mgrs_lock_' held. other_mgr_ptr.reset(); - if (io_request_context_ != NULL) io_mgr_->UnregisterContext(io_request_context_); + // Delete tmp files and cancel any in-flight writes. + tmp_file_group_->Close(); // If there are any outstanding writes and we are here it means that when the // WriteComplete() callback gets executed it is going to access invalid memory. // See IMPALA-1890. DCHECK_EQ(non_local_outstanding_writes_, 0) << endl << DebugInternal(); - // Delete tmp files. - tmp_file_group_->Close(); // Validate that clients deleted all of their blocks. Since all writes have // completed at this point, any deleted blocks should be in unused_blocks_. @@ -591,6 +584,13 @@ MemTracker* BufferedBlockMgr::get_tracker(Client* client) const { return client->tracker_; } +int64_t BufferedBlockMgr::GetNumWritesOutstanding() { + // Acquire lock to avoid returning mid-way through WriteComplete() when the + // state may be inconsistent. + lock_guard<mutex> lock(lock_); + return profile()->GetCounter("BlockWritesOutstanding")->value(); +} + Status BufferedBlockMgr::DeleteOrUnpinBlock(Block* block, bool unpin) { if (block == NULL) { return IsCancelled() ? Status::CANCELLED : Status::OK(); @@ -619,8 +619,10 @@ Status BufferedBlockMgr::PinBlock(Block* block, bool* pinned, Block* release_blo if (!status.ok()) goto error; *pinned = block->is_pinned_; - // Block was not evicted or had no data, nothing left to do. - if (in_mem || block->valid_data_len_ == 0) { + if (in_mem) { + // The block's buffer is still in memory with the original data. + status = CancelWrite(block); + if (!status.ok()) goto error; return DeleteOrUnpinBlock(release_block, unpin); } @@ -628,6 +630,9 @@ Status BufferedBlockMgr::PinBlock(Block* block, bool* pinned, Block* release_blo if (release_block == NULL) return Status::OK(); if (block->buffer_desc_ != NULL) { + // The block's buffer is still in memory but we couldn't get an additional buffer + // because it would eat into another client's reservation. However, we can use + // release_block's reservation, so reclaim the buffer. { lock_guard<mutex> lock(lock_); if (free_io_buffers_.Contains(block->buffer_desc_)) { @@ -646,9 +651,12 @@ Status BufferedBlockMgr::PinBlock(Block* block, bool* pinned, Block* release_blo status = WriteUnpinnedBlocks(); if (!status.ok()) goto error; } + status = CancelWrite(block); + if (!status.ok()) goto error; return DeleteOrUnpinBlock(release_block, unpin); } - + // FindBufferForBlock() wasn't able to find a buffer so transfer the one from + // 'release_block'. status = TransferBuffer(block, release_block, unpin); if (!status.ok()) goto error; DCHECK(!release_block->is_pinned_); @@ -657,33 +665,14 @@ Status BufferedBlockMgr::PinBlock(Block* block, bool* pinned, Block* release_blo *pinned = true; } - DCHECK(block->write_range_ != NULL) << block->DebugString() << endl << release_block; - - { - // Read the block from disk if it was not in memory. - SCOPED_TIMER(disk_read_timer_); - // Create a ScanRange to perform the read. - DiskIoMgr::ScanRange* scan_range = - obj_pool_.Add(new DiskIoMgr::ScanRange()); - scan_range->Reset(NULL, block->write_range_->file(), block->write_range_->len(), - block->write_range_->offset(), block->write_range_->disk_id(), false, - DiskIoMgr::BufferOpts::ReadInto(block->buffer(), block->buffer_len())); - DiskIoMgr::BufferDescriptor* io_mgr_buffer; - status = io_mgr_->Read(io_request_context_, scan_range, &io_mgr_buffer); - if (!status.ok()) goto error; - - DCHECK_EQ(io_mgr_buffer->buffer(), block->buffer()); - DCHECK_EQ(io_mgr_buffer->len(), block->valid_data_len()); - DCHECK(io_mgr_buffer->eosr()); - io_mgr_buffer->Return(); - } + DCHECK(block->write_handle_ != NULL) << block->DebugString() << endl << release_block; - if (FLAGS_disk_spill_encryption) { - // Decryption is done in-place, since the buffer can't be accessed by anyone else. - status = CheckHashAndDecrypt(block); + // The block is on disk - read it back into memory. + if (block->valid_data_len() > 0) { + status = tmp_file_group_->Read(block->write_handle_.get(), block->valid_data()); if (!status.ok()) goto error; } - + tmp_file_group_->DestroyWriteHandle(move(block->write_handle_)); return DeleteOrUnpinBlock(release_block, unpin); error: @@ -693,6 +682,24 @@ error: return status; } +Status BufferedBlockMgr::CancelWrite(Block* block) { + { + unique_lock<mutex> lock(lock_); + DCHECK(block->buffer_desc_ != NULL); + // If there is an in-flight write, wait for it to finish. This is sub-optimal + // compared to just cancelling the write, but reduces the number of possible + // code paths in this legacy code. + WaitForWrite(lock, block); + if (is_cancelled_) return Status::CANCELLED; + } + if (block->write_handle_ != NULL) { + // Restore the in-memory data without reading from disk (e.g. decrypt it). + RETURN_IF_ERROR(tmp_file_group_->CancelWriteAndRestoreData( + move(block->write_handle_), block->valid_data())); + } + return Status::OK(); +} + Status BufferedBlockMgr::UnpinBlock(Block* block) { DCHECK(!block->is_deleted_) << "Unpin for deleted block."; @@ -738,49 +745,17 @@ Status BufferedBlockMgr::WriteUnpinnedBlock(Block* block) { // Assumes block manager lock is already taken. DCHECK(!block->is_pinned_) << block->DebugString(); DCHECK(!block->in_write_) << block->DebugString(); + DCHECK(block->write_handle_ == NULL) << block->DebugString(); DCHECK_EQ(block->buffer_desc_->len, max_block_size_); - if (block->write_range_ == NULL) { - if (tmp_file_group_->NumFiles() == 0) { - RETURN_IF_ERROR(tmp_file_group_->CreateFiles(query_id_)); - } + // The block is on disk - read it back into memory. + RETURN_IF_ERROR(tmp_file_group_->Write(block->valid_data(), + [this, block](const Status& write_status) { WriteComplete(block, write_status); }, + &block->write_handle_)); - // First time the block is being persisted - need to allocate tmp file space. - TmpFileMgr::File* tmp_file; - int64_t file_offset; - RETURN_IF_ERROR( - tmp_file_group_->AllocateSpace(max_block_size_, &tmp_file, &file_offset)); - int disk_id = tmp_file->disk_id(); - if (disk_id < 0) { - // Assign a valid disk id to the write range if the tmp file was not assigned one. - static unsigned int next_disk_id = 0; - disk_id = ++next_disk_id; - } - disk_id %= io_mgr_->num_local_disks(); - DiskIoMgr::WriteRange::WriteDoneCallback callback = - bind(mem_fn(&BufferedBlockMgr::WriteComplete), this, block, _1); - block->write_range_ = obj_pool_.Add(new DiskIoMgr::WriteRange( - tmp_file->path(), file_offset, disk_id, callback)); - block->tmp_file_ = tmp_file; - } - - uint8_t* outbuf = NULL; - if (FLAGS_disk_spill_encryption) { - // The block->buffer() could be accessed during the write path, so we have to - // make a copy of it while writing. - RETURN_IF_ERROR(EncryptAndHash(block, &outbuf)); - } else { - outbuf = block->buffer(); - } - - block->write_range_->SetData(outbuf, block->valid_data_len_); - - // Issue write through DiskIoMgr. - RETURN_IF_ERROR(io_mgr_->AddWriteRange(io_request_context_, block->write_range_)); block->in_write_ = true; DCHECK(block->Validate()) << endl << block->DebugString(); outstanding_writes_counter_->Add(1); - bytes_written_counter_->Add(block->valid_data_len_); ++writes_issued_; if (writes_issued_ == 1) { if (ImpaladMetrics::NUM_QUERIES_SPILLED != NULL) { @@ -805,25 +780,22 @@ void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) { #endif Status status = Status::OK(); lock_guard<mutex> lock(lock_); - outstanding_writes_counter_->Add(-1); DCHECK(Validate()) << endl << DebugInternal(); DCHECK(is_cancelled_ || block->in_write_) << "WriteComplete() for block not in write." - << endl << block->DebugString(); + << endl + << block->DebugString(); DCHECK(block->buffer_desc_ != NULL); + + outstanding_writes_counter_->Add(-1); if (!block->client_local_) { DCHECK_GT(non_local_outstanding_writes_, 0) << block->DebugString(); --non_local_outstanding_writes_; } block->in_write_ = false; - // Explicitly release our temporarily allocated buffer here so that it doesn't - // hang around needlessly. - if (FLAGS_disk_spill_encryption) EncryptedWriteComplete(block); - // ReturnUnusedBlock() will clear the block, so save required state in local vars. // state is not valid if the block was deleted because the state may be torn down // after the state's fragment has deleted all of its blocks. - TmpFileMgr::File* tmp_file = block->tmp_file_; RuntimeState* state = block->is_deleted_ ? NULL : block->client_->state_; // If the block was re-pinned when it was in the IOMgr queue, don't free it. @@ -847,18 +819,17 @@ void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) { if (!write_status.ok() || !status.ok() || is_cancelled_) { VLOG_FILE << "Query: " << query_id_ << ". Write did not complete successfully: " - "write_status=" << write_status.GetDetail() << ", status=" << status.GetDetail() - << ". is_cancelled_=" << is_cancelled_; - + "write_status=" + << write_status.GetDetail() << ", status=" << status.GetDetail() + << ". is_cancelled_=" << is_cancelled_; // If the instance is already cancelled, don't confuse things with these errors. if (!write_status.ok() && !write_status.IsCancelled()) { // Report but do not attempt to recover from write error. - DCHECK(tmp_file != NULL); - if (!write_status.IsMemLimitExceeded()) tmp_file->ReportIOError(write_status.msg()); VLOG_QUERY << "Query: " << query_id_ << " write complete callback with error."; + if (state != NULL) state->LogError(write_status.msg()); } - if (!status.ok()) { + if (!status.ok() && !status.IsCancelled()) { VLOG_QUERY << "Query: " << query_id_ << " error while writing unpinned blocks."; if (state != NULL) state->LogError(status.msg()); } @@ -875,6 +846,7 @@ void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) { if (!block->client_local_) buffer_available_cv_.notify_all(); if (block->is_deleted_) { // Finish the DeleteBlock() work. + tmp_file_group_->DestroyWriteHandle(move(block->write_handle_)); block->buffer_desc_->block = NULL; block->buffer_desc_ = NULL; ReturnUnusedBlock(block); @@ -913,7 +885,9 @@ void BufferedBlockMgr::DeleteBlockLocked(const unique_lock<mutex>& lock, Block* if (block->in_write_) { DCHECK(block->buffer_desc_ != NULL && block->buffer_desc_->len == max_block_size_) << "Should never be writing a small buffer"; - // If a write is still pending, return. Cleanup will be done in WriteComplete(). + // If a write is still pending, cancel it and return. Cleanup will be done in + // WriteComplete(). Cancelling the write ensures that it won't try to log to the + // RuntimeState (which may be torn down before the block manager). DCHECK(block->Validate()) << endl << block->DebugString(); return; } @@ -935,6 +909,12 @@ void BufferedBlockMgr::DeleteBlockLocked(const unique_lock<mutex>& lock, Block* block->buffer_desc_ = NULL; } } + + // Discard any on-disk data. The write is finished so this won't call back into + // BufferedBlockMgr. + if (block->write_handle_ != NULL) { + tmp_file_group_->DestroyWriteHandle(move(block->write_handle_)); + } ReturnUnusedBlock(block); DCHECK(block->Validate()) << endl << block->DebugString(); DCHECK(Validate()) << endl << DebugInternal(); @@ -1224,7 +1204,7 @@ string BufferedBlockMgr::DebugString(Client* client) { string BufferedBlockMgr::DebugInternal() const { stringstream ss; - ss << "Buffered block mgr" << endl + ss << "Buffered block mgr " << this << endl << " Num writes outstanding: " << outstanding_writes_counter_->value() << endl << " Num free io buffers: " << free_io_buffers_.size() << endl << " Num unpinned blocks: " << unpinned_blocks_.size() << endl @@ -1234,6 +1214,7 @@ string BufferedBlockMgr::DebugInternal() const { << " Remaining memory: " << mem_tracker_->SpareCapacity() << " (#blocks=" << (mem_tracker_->SpareCapacity() / max_block_size_) << ")" << endl << " Block write threshold: " << block_write_threshold_; + if (tmp_file_group_ != NULL) ss << tmp_file_group_->DebugString(); return ss.str(); } @@ -1243,13 +1224,11 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, TmpFileMgr* tmp_file_mgr, unique_lock<mutex> l(lock_); if (initialized_) return; - io_mgr->RegisterContext(&io_request_context_, NULL); - profile_.reset(new RuntimeProfile(&obj_pool_, "BlockMgr")); parent_profile->AddChild(profile_.get()); - tmp_file_group_.reset( - new TmpFileMgr::FileGroup(tmp_file_mgr, profile_.get(), scratch_limit)); + tmp_file_group_.reset(new TmpFileMgr::FileGroup( + tmp_file_mgr, io_mgr, profile_.get(), query_id_, max_block_size_, scratch_limit)); mem_limit_counter_ = ADD_COUNTER(profile_.get(), "MemoryLimit", TUnit::BYTES); mem_limit_counter_->Set(mem_limit); @@ -1257,13 +1236,10 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, TmpFileMgr* tmp_file_mgr, block_size_counter_->Set(max_block_size_); created_block_counter_ = ADD_COUNTER(profile_.get(), "BlocksCreated", TUnit::UNIT); recycled_blocks_counter_ = ADD_COUNTER(profile_.get(), "BlocksRecycled", TUnit::UNIT); - bytes_written_counter_ = ADD_COUNTER(profile_.get(), "BytesWritten", TUnit::BYTES); outstanding_writes_counter_ = ADD_COUNTER(profile_.get(), "BlockWritesOutstanding", TUnit::UNIT); buffered_pin_counter_ = ADD_COUNTER(profile_.get(), "BufferedPins", TUnit::UNIT); - disk_read_timer_ = ADD_TIMER(profile_.get(), "TotalReadBlockTime"); buffer_wait_timer_ = ADD_TIMER(profile_.get(), "TotalBufferWaitTime"); - encryption_timer_ = ADD_TIMER(profile_.get(), "TotalEncryptionTime"); // Create a new mem_tracker and allocate buffers. mem_tracker_.reset( @@ -1272,45 +1248,4 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, TmpFileMgr* tmp_file_mgr, initialized_ = true; } -Status BufferedBlockMgr::EncryptAndHash(Block* block, uint8_t** outbuf) { - DCHECK(FLAGS_disk_spill_encryption); - DCHECK(block->buffer()); - DCHECK(!block->is_pinned_); - DCHECK(!block->in_write_); - DCHECK(outbuf); - SCOPED_TIMER(encryption_timer_); - // Encrypt to a temporary buffer since so that the original data is still in the buffer - // if the block is re-pinned while the write is still in-flight. - block->encrypted_write_buffer_.reset(new uint8_t[block->valid_data_len_]); - // Since we're using AES-CFB mode, we must take care not to reuse a key/IV pair. - // Regenerate a new key and IV for every block of data we write, including between - // writes of the same Block. - block->key_.InitializeRandom(); - RETURN_IF_ERROR(block->key_.Encrypt( - block->buffer(), block->valid_data_len_, block->encrypted_write_buffer_.get())); - - block->hash_.Compute(block->encrypted_write_buffer_.get(), block->valid_data_len_); - - *outbuf = block->encrypted_write_buffer_.get(); - return Status::OK(); -} - -void BufferedBlockMgr::EncryptedWriteComplete(Block* block) { - DCHECK(FLAGS_disk_spill_encryption); - DCHECK(block->encrypted_write_buffer_.get()); - block->encrypted_write_buffer_.reset(); -} - -Status BufferedBlockMgr::CheckHashAndDecrypt(Block* block) { - DCHECK(FLAGS_disk_spill_encryption); - DCHECK(block->buffer()); - SCOPED_TIMER(encryption_timer_); - - if (!block->hash_.Verify(block->buffer(), block->valid_data_len_)) { - return Status("Block verification failure"); - } - // Decrypt block->buffer() in-place. Safe because no one is accessing it. - return block->key_.Decrypt(block->buffer(), block->valid_data_len_, block->buffer()); -} - } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/buffered-block-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr.h b/be/src/runtime/buffered-block-mgr.h index 269b707..bbe8429 100644 --- a/be/src/runtime/buffered-block-mgr.h +++ b/be/src/runtime/buffered-block-mgr.h @@ -20,7 +20,7 @@ #include "runtime/disk-io-mgr.h" #include "runtime/tmp-file-mgr.h" -#include "util/openssl-util.h" +#include "util/mem-range.h" namespace impala { @@ -175,6 +175,13 @@ class BufferedBlockMgr { return buffer_desc_->buffer; } + /// Returns a reference to the valid data in the block's buffer. Only guaranteed to + /// be valid if the block is pinned. + MemRange valid_data() const { + DCHECK(buffer_desc_ != NULL); + return MemRange(buffer_desc_->buffer, valid_data_len_); + } + /// Return the number of bytes allocated in this block. int64_t valid_data_len() const { return valid_data_len_; } @@ -223,17 +230,9 @@ class BufferedBlockMgr { /// The client that owns this block. Client* client_; - /// WriteRange object representing the on-disk location used to persist a block. - /// Is created the first time a block is persisted, and retained until the block - /// object is destroyed. The file location and offset in write_range_ are valid - /// throughout the lifetime of this object, but the data and length in the - /// write_range_ are only valid while the block is being written. - /// write_range_ instance is owned by the block manager. - DiskIoMgr::WriteRange* write_range_; - - /// The file this block belongs to. The lifetime is the same as the file location - /// and offset in write_range_. The File is owned by BufferedBlockMgr, not TmpFileMgr. - TmpFileMgr::File* tmp_file_; + /// Non-NULL when the block data is written to scratch or is in the process of being + /// written. + std::unique_ptr<TmpFileMgr::WriteHandle> write_handle_; /// Length of valid (i.e. allocated) data within the block. int64_t valid_data_len_; @@ -241,20 +240,6 @@ class BufferedBlockMgr { /// Number of rows in this block. int num_rows_; - /// If --disk_spill_encryption is on, in the write path we allocate a new buffer to - /// hold encrypted data while it's being written to disk. In the read path, we can - /// instead decrypt data in place since no one else because the read buffer isn't - /// accessible to any other threads until Pin() returns. - boost::scoped_array<uint8_t> encrypted_write_buffer_; - - /// If --disk_spill_encryption is on, a AES 256-bit key and initialization vector. - /// Regenerated on each write. - EncryptionKey key_; - - /// If --disk_spill_encryption is on, our hash of the data being written. Filled in - /// on writes; verified on reads. This is calculated _after_ encryption. - IntegrityHash hash_; - /// Block state variables. The block's buffer can be freed only if is_pinned_ and /// in_write_ are both false. @@ -335,8 +320,8 @@ class BufferedBlockMgr { /// - If there is memory pressure, block will get the buffer from 'unpin_block'. Status GetNewBlock(Client* client, Block* unpin_block, Block** block, int64_t len = -1); - /// Cancels the block mgr. All subsequent calls that return a Status fail with - /// Status::CANCELLED. Idempotent. + /// Test helper to cancel the block mgr. All subsequent calls that return a Status fail + /// with Status::CANCELLED. Idempotent. void Cancel(); /// Returns true if the block manager was cancelled. @@ -360,10 +345,6 @@ class BufferedBlockMgr { /// ReleaseMemory() call. void ReleaseMemory(Client* client, int64_t size); - /// The number of buffers available for client. That is, if all other clients were - /// stopped, the number of buffers this client could get. - int64_t available_buffers(Client* client) const; - /// Returns a MEM_LIMIT_EXCEEDED error which includes the minimum memory required by /// this 'client' that acts on behalf of the node with id 'node_id'. 'node_id' is used /// only for error reporting. @@ -381,6 +362,7 @@ class BufferedBlockMgr { void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; } private: + friend class BufferedBlockMgrTest; friend struct Client; /// Descriptor for a single memory buffer in the pool. @@ -415,6 +397,12 @@ class BufferedBlockMgr { void DeleteBlock(Block* block); void DeleteBlockLocked(const boost::unique_lock<boost::mutex>& lock, Block* block); + /// If there is an in-flight write, cancel the write and restore the contents of the + /// block's buffer. If no write has been started for 'block', does nothing. 'block' + /// must have an associated buffer. Returns an error status if an error is encountered + /// while cancelling the write or CANCELLED if the block mgr is cancelled. + Status CancelWrite(Block* block); + /// If the 'block' is NULL, checks if cancelled and returns. Otherwise, depending on /// 'unpin' calls either DeleteBlock() or UnpinBlock(), which both first check for /// cancellation. It should be called without the lock_ acquired. @@ -428,6 +416,10 @@ class BufferedBlockMgr { /// The caller should not hold 'lock_'. Status TransferBuffer(Block* dst, Block* src, bool unpin); + /// The number of buffers available for client. That is, if all other clients were + /// stopped, the number of buffers this client could get. + int64_t available_buffers(Client* client) const; + /// Returns the total number of unreserved buffers. This is the sum of unpinned, /// free and buffers we can still allocate minus the total number of reserved buffers /// that are not pinned. @@ -461,8 +453,8 @@ class BufferedBlockMgr { /// Issues the write for this block to the DiskIoMgr. Status WriteUnpinnedBlock(Block* block); - /// Wait until either the write for 'block' completes or the block mgr is cancelled. - /// 'lock_' must be held with 'lock'. + /// Wait until either there is no in-flight write for 'block' or the block mgr is + /// cancelled. 'lock_' must be held with 'lock'. void WaitForWrite(boost::unique_lock<boost::mutex>& lock, Block* block); /// Callback used by DiskIoMgr to indicate a block write has completed. write_status @@ -481,6 +473,9 @@ class BufferedBlockMgr { /// Non-blocking and needs no lock_. Block* GetUnusedBlock(Client* client); + // Test helper to get the number of block writes currently outstanding. + int64_t GetNumWritesOutstanding(); + /// Used to debug the state of the block manager. Lock must already be taken. bool Validate() const; std::string DebugInternal() const; @@ -559,10 +554,6 @@ class BufferedBlockMgr { /// blocks may be written. Blocks are round-robined across these files. boost::scoped_ptr<TmpFileMgr::FileGroup> tmp_file_group_; - /// DiskIoMgr handles to read and write blocks. - DiskIoMgr* io_mgr_; - DiskIoRequestContext* io_request_context_; - /// If true, a disk write failed and all API calls return. /// Status::CANCELLED. Set to true if there was an error writing a block, or if /// WriteComplete() needed to reissue the write and that failed. @@ -584,21 +575,12 @@ class BufferedBlockMgr { /// Number of Pin() calls that did not require a disk read. RuntimeProfile::Counter* buffered_pin_counter_; - /// Time taken for disk reads. - RuntimeProfile::Counter* disk_read_timer_; - /// Time spent waiting for a free buffer. RuntimeProfile::Counter* buffer_wait_timer_; - /// Number of bytes written to disk (includes writes still queued in the IO manager). - RuntimeProfile::Counter* bytes_written_counter_; - /// Number of writes outstanding (issued but not completed). RuntimeProfile::Counter* outstanding_writes_counter_; - /// Time spent in disk spill encryption, decryption, and integrity checking. - RuntimeProfile::Counter* encryption_timer_; - /// Number of writes issued. int writes_issued_; @@ -609,21 +591,9 @@ class BufferedBlockMgr { /// map contains only weak ptrs. BufferedBlockMgrs that are handed out are shared ptrs. /// When all the shared ptrs are no longer referenced, the BufferedBlockMgr /// d'tor will be called at which point the weak ptr will be removed from the map. - typedef boost::unordered_map<TUniqueId, std::weak_ptr<BufferedBlockMgr>> - BlockMgrsMap; + typedef boost::unordered_map<TUniqueId, std::weak_ptr<BufferedBlockMgr>> BlockMgrsMap; static BlockMgrsMap query_to_block_mgrs_; - /// Takes the data in buffer(), allocates 'encrypted_write_buffer_', encrypts the data - /// into 'encrypted_write_buffer_' and computes 'hash_'. Returns a pointer to the - /// encrypted data in 'outbuf'. - Status EncryptAndHash(Block* block, uint8_t** outbuf); - - /// Deallocates the block's encrypted write buffer alloced in EncryptAndHash(). - void EncryptedWriteComplete(Block* block); - - /// Verifies the integrity hash and decrypts the contents of buffer() in place. - Status CheckHashAndDecrypt(Block* block); - /// Debug option to delay write completion. int debug_write_delay_ms_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/disk-io-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc index 9f8d6d7..016b14f 100644 --- a/be/src/runtime/disk-io-mgr-test.cc +++ b/be/src/runtime/disk-io-mgr-test.cc @@ -239,12 +239,12 @@ TEST_F(DiskIoMgrTest, SingleWriter) { read_io_mgr.reset(); } -// Perform invalid writes (e.g. non-existent file, negative offset) and validate -// that an error status is returned via the write callback. +// Perform invalid writes (e.g. file in non-existent directory, negative offset) and +// validate that an error status is returned via the write callback. TEST_F(DiskIoMgrTest, InvalidWrite) { MemTracker mem_tracker(LARGE_MEM_LIMIT); num_ranges_written_ = 0; - string tmp_file = "/tmp/non-existent.txt"; + string tmp_file = "/non-existent/file.txt"; DiskIoMgr io_mgr(1, 1, 1, 10); ASSERT_OK(io_mgr.Init(&mem_tracker)); DiskIoRequestContext* writer; @@ -252,12 +252,12 @@ TEST_F(DiskIoMgrTest, InvalidWrite) { int32_t* data = pool_->Add(new int32_t); *data = rand(); - // Write to a non-existent file. + // Write to file in non-existent directory. DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*); DiskIoMgr::WriteRange::WriteDoneCallback callback = - bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, - new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, - data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1); + bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range, + (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, data, + Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1); *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback)); (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 87ac33a..20cb9b5 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -78,6 +78,8 @@ static const int LOW_MEMORY = 64 * 1024 * 1024; const int DiskIoMgr::DEFAULT_QUEUE_CAPACITY = 2; +AtomicInt32 DiskIoMgr::next_disk_id_; + namespace detail { // Indicates if file handle caching should be used static inline bool is_file_handle_caching_enabled() { @@ -262,6 +264,11 @@ void DiskIoMgr::BufferDescriptor::Return() { DiskIoMgr::WriteRange::WriteRange( const string& file, int64_t file_offset, int disk_id, WriteDoneCallback callback) : RequestRange(RequestType::WRITE), callback_(callback) { + SetRange(file, file_offset, disk_id); +} + +void DiskIoMgr::WriteRange::SetRange( + const std::string& file, int64_t file_offset, int disk_id) { file_ = file; offset_ = file_offset; disk_id_ = disk_id; @@ -947,8 +954,11 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, return false; } -void DiskIoMgr::HandleWriteFinished(DiskIoRequestContext* writer, WriteRange* write_range, - const Status& write_status) { +void DiskIoMgr::HandleWriteFinished( + DiskIoRequestContext* writer, WriteRange* write_range, const Status& write_status) { + // Copy disk_id before running callback: the callback may modify write_range. + int disk_id = write_range->disk_id_; + // Execute the callback before decrementing the thread count. Otherwise CancelContext() // that waits for the disk ref count to be 0 will return, creating a race, e.g. // between BufferedBlockMgr::WriteComplete() and BufferedBlockMgr::~BufferedBlockMgr(). @@ -958,7 +968,7 @@ void DiskIoMgr::HandleWriteFinished(DiskIoRequestContext* writer, WriteRange* wr { unique_lock<mutex> writer_lock(writer->lock_); DCHECK(writer->Validate()) << endl << writer->DebugString(); - DiskIoRequestContext::PerDiskState& state = writer->disk_states_[write_range->disk_id_]; + DiskIoRequestContext::PerDiskState& state = writer->disk_states_[disk_id]; if (writer->state_ == DiskIoRequestContext::Cancelled) { state.DecrementRequestThreadAndCheckDone(writer); } else { @@ -1152,13 +1162,24 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::TryAllocateNextBufferForRange( } void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* write_range) { - FILE* file_handle = fopen(write_range->file(), "rb+"); - Status ret_status; - if (file_handle == NULL) { + Status ret_status = Status::OK(); + FILE* file_handle = NULL; + // Raw open() syscall will create file if not present when passed these flags. + int fd = open(write_range->file(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); + if (fd < 0) { ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR, - Substitute("fopen($0, \"rb+\") failed with errno=$1 description=$2", - write_range->file_, errno, GetStrErrMsg()))); + Substitute("Opening '$0' for write failed with errno=$1 description=$2", + write_range->file_, errno, GetStrErrMsg()))); } else { + file_handle = fdopen(fd, "wb"); + if (file_handle == NULL) { + ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR, + Substitute("fdopen($0, \"wb\") failed with errno=$1 description=$2", fd, errno, + GetStrErrMsg()))); + } + } + + if (file_handle != NULL) { ret_status = WriteRangeHelper(file_handle, write_range); int success = fclose(file_handle); @@ -1225,9 +1246,8 @@ int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) { // Assign to a local disk queue. DCHECK(!IsS3APath(file)); // S3 is always remote. if (disk_id == -1) { - // disk id is unknown, assign it a random one. - static int next_disk_id = 0; - disk_id = next_disk_id++; + // disk id is unknown, assign it an arbitrary one. + disk_id = next_disk_id_.Add(1); } // TODO: we need to parse the config for the number of dirs configured for this // data node. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h index 4b650a8..c67f69d 100644 --- a/be/src/runtime/disk-io-mgr.h +++ b/be/src/runtime/disk-io-mgr.h @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. - #ifndef IMPALA_RUNTIME_DISK_IO_MGR_H #define IMPALA_RUNTIME_DISK_IO_MGR_H +#include <functional> #include <list> #include <vector> @@ -599,14 +599,22 @@ class DiskIoMgr { /// (TStatusCode::CANCELLED). The callback is only invoked if this WriteRange was /// successfully added (i.e. AddWriteRange() succeeded). No locks are held while /// the callback is invoked. - typedef boost::function<void (const Status&)> WriteDoneCallback; + typedef std::function<void(const Status&)> WriteDoneCallback; WriteRange(const std::string& file, int64_t file_offset, int disk_id, WriteDoneCallback callback); + /// Change the file and offset of this write range. Data and callbacks are unchanged. + /// Can only be called when the write is not in flight (i.e. before AddWriteRange() + /// is called or after the write callback was called). + void SetRange(const std::string& file, int64_t file_offset, int disk_id); + /// Set the data and number of bytes to be written for this WriteRange. - /// File data can be over-written by calling SetData() and AddWriteRange(). + /// Can only be called when the write is not in flight (i.e. before AddWriteRange() + /// is called or after the write callback was called). void SetData(const uint8_t* buffer, int64_t len); + const uint8_t* data() const { return data_; } + private: friend class DiskIoMgr; friend class DiskIoRequestContext; @@ -855,6 +863,11 @@ class DiskIoMgr { /// It is indexed by disk id. std::vector<DiskQueue*> disk_queues_; + /// The next disk queue to write to if the actual 'disk_id_' is unknown (i.e. the file + /// is not associated with a particular local disk or remote queue). Used to implement + /// round-robin assignment for that case. + static AtomicInt32 next_disk_id_; + // Caching structure that maps file names to cached file handles. The cache has an upper // limit of entries defined by FLAGS_max_cached_file_handles. Evicted cached file // handles are closed. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index ad996e3..d93a459 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -171,7 +171,7 @@ ExecEnv::ExecEnv() scheduler_.reset(new SimpleScheduler( addresses, metrics_.get(), webserver_.get(), request_pool_service_.get())); } - if (exec_env_ == NULL) exec_env_ = this; + exec_env_ = this; } // TODO: Need refactor to get rid of duplicated code. @@ -224,12 +224,10 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, scheduler_.reset(new SimpleScheduler( addresses, metrics_.get(), webserver_.get(), request_pool_service_.get())); } - if (exec_env_ == NULL) exec_env_ = this; + exec_env_ = this; } - -ExecEnv::~ExecEnv() { -} +ExecEnv::~ExecEnv() {} Status ExecEnv::InitForFeTests() { mem_tracker_.reset(new MemTracker(-1, "Process")); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/exec-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index 08ddd9f..be90a5a 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -61,7 +61,7 @@ class ExecEnv { /// Returns the first created exec env instance. In a normal impalad, this is /// the only instance. In test setups with multiple ExecEnv's per process, - /// we return the first instance. + /// we return the most recently created instance. static ExecEnv* GetInstance() { return exec_env_; } /// Empty destructor because the compiler-generated one requires full http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 2757750..def95c0 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -45,8 +45,6 @@ QueryState::QueryState(const TQueryCtx& query_ctx) // how many are distinct. It is defined as the sum of the number of generic errors and // the number of distinct other errors. if (query_options.max_errors <= 0) { - // TODO: fix linker error and uncomment this - //query_options_.max_errors = FLAGS_max_errors; query_options.max_errors = 100; } if (query_options.batch_size <= 0) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/tmp-file-mgr-internal.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h new file mode 100644 index 0000000..dd8bd07 --- /dev/null +++ b/be/src/runtime/tmp-file-mgr-internal.h @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_TMP_FILE_MGR_INTERNAL_H +#define IMPALA_RUNTIME_TMP_FILE_MGR_INTERNAL_H + +#include <string> + +#include "runtime/tmp-file-mgr.h" + +namespace impala { + +/// File is a handle to a physical file in a temporary directory. File space +/// can be allocated and files removed using AllocateSpace() and Remove(). Used +/// internally by TmpFileMgr. +/// +/// Creation of the physical file in the file system is deferred until the file is +/// written by DiskIoMgr. +/// +/// Methods of File are not thread-safe. +class TmpFileMgr::File { + public: + File(FileGroup* file_group, DeviceId device_id, const std::string& path); + + /// Allocates 'num_bytes' bytes in this file for a new block of data. + /// The file size is increased by a call to truncate() if necessary. + /// Returns Status::OK() and sets 'offset' to the file offset of the first + /// byte in the allocated range on success. + /// Returns an error status if an unexpected error occurs, e.g. the file could not + /// be created. + Status AllocateSpace(int64_t num_bytes, int64_t* offset); + + /// Called when an IO error is encountered for this file. Logs the error and blacklists + /// the file. + void Blacklist(const ErrorMsg& msg); + + /// Delete the physical file on disk, if one was created. + /// It is not valid to read or write to a file after calling Remove(). + Status Remove(); + + /// Get the disk ID that should be used for IO mgr queueing. + int AssignDiskQueue() const; + + const std::string& path() const { return path_; } + bool is_blacklisted() const { return blacklisted_; } + + std::string DebugString(); + + private: + friend class TmpFileMgrTest; + /// The name of the sub-directory that Impala creates within each configured scratch + /// directory. + const static std::string TMP_SUB_DIR_NAME; + + /// Space (in MB) that must ideally be available for writing on a scratch + /// directory. A warning is issued if available space is less than this threshold. + const static uint64_t AVAILABLE_SPACE_THRESHOLD_MB; + + /// The FileGroup this belongs to. Cannot be null. + FileGroup* const file_group_; + + /// Path of the physical file in the filesystem. + const std::string path_; + + /// The temporary device this file is stored on. + const DeviceId device_id_; + + /// The id of the disk on which the physical file lies. + const int disk_id_; + + /// Current bytes allocated in the file. Modified by AllocateSpace(). + int64_t bytes_allocated_; + + /// Set to true to indicate that we shouldn't allocate any more space in this file. + bool blacklisted_; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/tmp-file-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index 4deefd5..61fd682 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -19,12 +19,16 @@ #include <boost/filesystem.hpp> #include <boost/scoped_ptr.hpp> +#include <boost/thread/locks.hpp> #include <gtest/gtest.h> #include "common/init.h" +#include "runtime/test-env.h" +#include "runtime/tmp-file-mgr-internal.h" #include "runtime/tmp-file-mgr.h" #include "service/fe-support.h" #include "testutil/gtest-util.h" +#include "util/condition-variable.h" #include "util/filesystem-util.h" #include "util/metrics.h" @@ -37,22 +41,27 @@ using boost::filesystem::path; namespace impala { class TmpFileMgrTest : public ::testing::Test { - protected: + public: virtual void SetUp() { metrics_.reset(new MetricGroup("tmp-file-mgr-test")); profile_ = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "tmp-file-mgr-test")); + test_env_.reset(new TestEnv); + cb_counter_ = 0; } virtual void TearDown() { + test_env_.reset(); metrics_.reset(); obj_pool_.Clear(); } + DiskIoMgr* io_mgr() { return test_env_->exec_env()->disk_io_mgr(); } + /// Check that metric values are consistent with TmpFileMgr state. void CheckMetrics(TmpFileMgr* tmp_file_mgr) { - vector<TmpFileMgr::DeviceId> active = tmp_file_mgr->active_tmp_devices(); - IntGauge* active_metric = metrics_->FindMetricForTesting<IntGauge>( - "tmp-file-mgr.active-scratch-dirs"); + vector<TmpFileMgr::DeviceId> active = tmp_file_mgr->ActiveTmpDevices(); + IntGauge* active_metric = + metrics_->FindMetricForTesting<IntGauge>("tmp-file-mgr.active-scratch-dirs"); EXPECT_EQ(active.size(), active_metric->value()); SetMetric<string>* active_set_metric = metrics_->FindMetricForTesting<SetMetric<string>>( @@ -71,54 +80,118 @@ class TmpFileMgrTest : public ::testing::Test { } } - /// Helper to call the private NewFile() method. - static Status NewFile(TmpFileMgr::FileGroup* group, - const TmpFileMgr::DeviceId& device_id, const TUniqueId& query_id, - TmpFileMgr::File** new_file) { - return group->NewFile(device_id, query_id, new_file); + /// Helper to call the private CreateFiles() method and return + /// the created files. + static Status CreateFiles( + TmpFileMgr::FileGroup* group, vector<TmpFileMgr::File*>* files) { + // The method expects the lock to be held. + lock_guard<SpinLock> lock(group->lock_); + RETURN_IF_ERROR(group->CreateFiles()); + for (unique_ptr<TmpFileMgr::File>& file : group->tmp_files_) { + files->push_back(file.get()); + } + return Status::OK(); + } + + /// Helper to call the private TmpFileMgr::NewFile() method. + static Status NewFile(TmpFileMgr* mgr, TmpFileMgr::FileGroup* group, + TmpFileMgr::DeviceId device_id, unique_ptr<TmpFileMgr::File>* new_file) { + return mgr->NewFile(group, device_id, new_file); } - /// Helper to call the private TmpFile::AllocateSpace() method. - static Status AllocateSpace( + /// Helper to call the private File::AllocateSpace() method. + static Status FileAllocateSpace( TmpFileMgr::File* file, int64_t num_bytes, int64_t* offset) { return file->AllocateSpace(num_bytes, offset); } + /// Helper to call the private FileGroup::AllocateSpace() method. + static Status GroupAllocateSpace(TmpFileMgr::FileGroup* group, int64_t num_bytes, + TmpFileMgr::File** file, int64_t* offset) { + return group->AllocateSpace(num_bytes, file, offset); + } + + /// Helper to set FileGroup::next_allocation_index_. + static void SetNextAllocationIndex(TmpFileMgr::FileGroup* group, int value) { + group->next_allocation_index_ = value; + } + + /// Helper to get the # of bytes allocated by the group. Validates that the sum across + /// all files equals this total. + static int64_t BytesAllocated(TmpFileMgr::FileGroup* group) { + int64_t bytes_allocated = 0; + for (unique_ptr<TmpFileMgr::File>& file : group->tmp_files_) { + bytes_allocated += file->bytes_allocated_; + } + EXPECT_EQ(bytes_allocated, group->current_bytes_allocated_); + return bytes_allocated; + } + + // Write callback, which signals 'cb_cv_' and increments 'cb_counter_'. + void SignalCallback(Status write_status) { + { + lock_guard<mutex> lock(cb_cv_lock_); + ++cb_counter_; + } + cb_cv_.NotifyAll(); + } + + /// Wait until 'cb_counter_' reaches 'val'. + void WaitForCallbacks(int64_t val) { + unique_lock<mutex> lock(cb_cv_lock_); + while (cb_counter_ < val) cb_cv_.Wait(lock); + } + ObjectPool obj_pool_; scoped_ptr<MetricGroup> metrics_; // Owned by 'obj_pool_'. RuntimeProfile* profile_; + + /// Used for DiskIoMgr. + scoped_ptr<TestEnv> test_env_; + + // Variables used by SignalCallback(). + mutex cb_cv_lock_; + ConditionVariable cb_cv_; + int64_t cb_counter_; }; /// Regression test for IMPALA-2160. Verify that temporary file manager allocates blocks -/// at the expected file offsets and expands the temporary file to the correct size. +/// at the expected file offsets. TEST_F(TmpFileMgrTest, TestFileAllocation) { TmpFileMgr tmp_file_mgr; ASSERT_OK(tmp_file_mgr.Init(metrics_.get())); - TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_); + TUniqueId id; + TmpFileMgr::FileGroup file_group( + &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8); + // Default configuration should give us one temporary device. - EXPECT_EQ(1, tmp_file_mgr.num_active_tmp_devices()); - vector<TmpFileMgr::DeviceId> tmp_devices = tmp_file_mgr.active_tmp_devices(); + EXPECT_EQ(1, tmp_file_mgr.NumActiveTmpDevices()); + vector<TmpFileMgr::DeviceId> tmp_devices = tmp_file_mgr.ActiveTmpDevices(); EXPECT_EQ(1, tmp_devices.size()); - TUniqueId id; - TmpFileMgr::File* file; - ASSERT_OK(NewFile(&file_group, tmp_devices[0], id, &file)); - EXPECT_TRUE(file != NULL); + vector<TmpFileMgr::File*> files; + ASSERT_OK(CreateFiles(&file_group, &files)); + EXPECT_EQ(1, files.size()); + TmpFileMgr::File* file = files[0]; // Apply writes of variable sizes and check space was allocated correctly. int64_t write_sizes[] = {1, 10, 1024, 4, 1024 * 1024 * 8, 1024 * 1024 * 8, 16, 10}; int num_write_sizes = sizeof(write_sizes) / sizeof(write_sizes[0]); int64_t next_offset = 0; for (int i = 0; i < num_write_sizes; ++i) { int64_t offset; - ASSERT_OK(AllocateSpace(file, write_sizes[i], &offset)); + ASSERT_OK(FileAllocateSpace(file, write_sizes[i], &offset)); EXPECT_EQ(next_offset, offset); next_offset = offset + write_sizes[i]; - EXPECT_EQ(next_offset, boost::filesystem::file_size(file->path())); } // Check that cleanup is correct. string file_path = file->path(); - file_group.Close(); EXPECT_FALSE(boost::filesystem::exists(file_path)); + + // Check that the file is cleaned up correctly. Need to create file first since + // tmp file is only allocated on writes. + EXPECT_OK(FileSystemUtil::CreateFile(file->path())); + file_group.Close(); + EXPECT_FALSE(boost::filesystem::exists(file->path())); CheckMetrics(&tmp_file_mgr); } @@ -129,15 +202,18 @@ TEST_F(TmpFileMgrTest, TestOneDirPerDevice) { RemoveAndCreateDirs(tmp_dirs); TmpFileMgr tmp_file_mgr; tmp_file_mgr.InitCustom(tmp_dirs, true, metrics_.get()); - TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_); + TUniqueId id; + TmpFileMgr::FileGroup file_group( + &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8); // Only the first directory should be used. - EXPECT_EQ(1, tmp_file_mgr.num_active_tmp_devices()); - vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.active_tmp_devices(); + EXPECT_EQ(1, tmp_file_mgr.NumActiveTmpDevices()); + vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.ActiveTmpDevices(); EXPECT_EQ(1, devices.size()); - TUniqueId id; - TmpFileMgr::File* file; - ASSERT_OK(NewFile(&file_group, devices[0], id, &file)); + vector<TmpFileMgr::File*> files; + ASSERT_OK(CreateFiles(&file_group, &files)); + EXPECT_EQ(1, files.size()); + TmpFileMgr::File* file = files[0]; // Check the prefix is the expected temporary directory. EXPECT_EQ(0, file->path().find(tmp_dirs[0])); FileSystemUtil::RemovePaths(tmp_dirs); @@ -151,19 +227,22 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) { RemoveAndCreateDirs(tmp_dirs); TmpFileMgr tmp_file_mgr; tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()); - TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_); + TUniqueId id; + TmpFileMgr::FileGroup file_group( + &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8); // Both directories should be used. - EXPECT_EQ(2, tmp_file_mgr.num_active_tmp_devices()); - vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.active_tmp_devices(); + EXPECT_EQ(2, tmp_file_mgr.NumActiveTmpDevices()); + vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.ActiveTmpDevices(); EXPECT_EQ(2, devices.size()); - for (int i = 0; i < tmp_dirs.size(); ++i) { + + vector<TmpFileMgr::File*> files; + ASSERT_OK(CreateFiles(&file_group, &files)); + EXPECT_EQ(2, files.size()); + for (int i = 0; i < 2; ++i) { EXPECT_EQ(0, tmp_file_mgr.GetTmpDirPath(devices[i]).find(tmp_dirs[i])); - TUniqueId id; - TmpFileMgr::File* file; - ASSERT_OK(NewFile(&file_group, devices[i], id, &file)); // Check the prefix is the expected temporary directory. - EXPECT_EQ(0, file->path().find(tmp_dirs[i])); + EXPECT_EQ(0, files[i]->path().find(tmp_dirs[i])); } FileSystemUtil::RemovePaths(tmp_dirs); file_group.Close(); @@ -171,79 +250,79 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) { } /// Test that reporting a write error is possible but does not result in -/// blacklisting, which is disabled. +/// blacklisting the device. TEST_F(TmpFileMgrTest, TestReportError) { vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"}); RemoveAndCreateDirs(tmp_dirs); TmpFileMgr tmp_file_mgr; tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()); - TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_); + TUniqueId id; + TmpFileMgr::FileGroup file_group( + &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8); // Both directories should be used. - vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.active_tmp_devices(); + vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.ActiveTmpDevices(); EXPECT_EQ(2, devices.size()); CheckMetrics(&tmp_file_mgr); // Inject an error on one device so that we can validate it is handled correctly. - TUniqueId id; int good_device = 0, bad_device = 1; - TmpFileMgr::File* bad_file; - ASSERT_OK(NewFile(&file_group, devices[bad_device], id, &bad_file)); + vector<TmpFileMgr::File*> files; + ASSERT_OK(CreateFiles(&file_group, &files)); + ASSERT_EQ(2, files.size()); + TmpFileMgr::File* good_file = files[good_device]; + TmpFileMgr::File* bad_file = files[bad_device]; ErrorMsg errmsg(TErrorCode::GENERAL, "A fake error"); - bad_file->ReportIOError(errmsg); + bad_file->Blacklist(errmsg); - // Blacklisting is disabled. - EXPECT_FALSE(bad_file->is_blacklisted()); - // The second device should still be active. - EXPECT_EQ(2, tmp_file_mgr.num_active_tmp_devices()); - vector<TmpFileMgr::DeviceId> devices_after = tmp_file_mgr.active_tmp_devices(); + // File-level blacklisting is enabled but not device-level. + EXPECT_TRUE(bad_file->is_blacklisted()); + // The bad device should still be active. + EXPECT_EQ(2, tmp_file_mgr.NumActiveTmpDevices()); + vector<TmpFileMgr::DeviceId> devices_after = tmp_file_mgr.ActiveTmpDevices(); EXPECT_EQ(2, devices_after.size()); CheckMetrics(&tmp_file_mgr); // Attempts to expand bad file should succeed. int64_t offset; - ASSERT_OK(AllocateSpace(bad_file, 128, &offset)); + ASSERT_OK(FileAllocateSpace(bad_file, 128, &offset)); // The good device should still be usable. - TmpFileMgr::File* good_file; - ASSERT_OK(NewFile(&file_group, devices[good_device], id, &good_file)); - EXPECT_TRUE(good_file != NULL); - ASSERT_OK(AllocateSpace(good_file, 128, &offset)); + ASSERT_OK(FileAllocateSpace(good_file, 128, &offset)); // Attempts to allocate new files on bad device should succeed. - ASSERT_OK(NewFile(&file_group, devices[bad_device], id, &bad_file)); + unique_ptr<TmpFileMgr::File> bad_file2; + ASSERT_OK(NewFile(&tmp_file_mgr, &file_group, bad_device, &bad_file2)); FileSystemUtil::RemovePaths(tmp_dirs); file_group.Close(); CheckMetrics(&tmp_file_mgr); } -TEST_F(TmpFileMgrTest, TestAllocateFails) { - string tmp_dir("/tmp/tmp-file-mgr-test.1"); - string scratch_subdir = tmp_dir + "/impala-scratch"; - vector<string> tmp_dirs({tmp_dir}); +TEST_F(TmpFileMgrTest, TestAllocateNonWritable) { + vector<string> tmp_dirs; + vector<string> scratch_subdirs; + for (int i = 0; i < 2; ++i) { + tmp_dirs.push_back(Substitute("/tmp/tmp-file-mgr-test.$0", i)); + scratch_subdirs.push_back(tmp_dirs[i] + "/impala-scratch"); + } RemoveAndCreateDirs(tmp_dirs); TmpFileMgr tmp_file_mgr; tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()); - TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_); - TUniqueId id; - TmpFileMgr::File* allocated_file1; - TmpFileMgr::File* allocated_file2; + TmpFileMgr::FileGroup file_group( + &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8); + + vector<TmpFileMgr::File*> allocated_files; + ASSERT_OK(CreateFiles(&file_group, &allocated_files)) int64_t offset; - ASSERT_OK(NewFile(&file_group, 0, id, &allocated_file1)); - ASSERT_OK(NewFile(&file_group, 0, id, &allocated_file2)); - ASSERT_OK(AllocateSpace(allocated_file1, 1, &offset)); + ASSERT_OK(FileAllocateSpace(allocated_files[0], 1, &offset)); - // Make scratch non-writable and test for allocation errors at different stages: + // Make scratch non-writable and test allocation at different stages: // new file creation, files with no allocated blocks. files with allocated space. - chmod(scratch_subdir.c_str(), 0); - // allocated_file1 already has space allocated. - EXPECT_FALSE(AllocateSpace(allocated_file1, 1, &offset).ok()); - // allocated_file2 has no space allocated. - EXPECT_FALSE(AllocateSpace(allocated_file2, 1, &offset).ok()); - // Creating a new File object can succeed because it is not immediately created on disk. - TmpFileMgr::File* unallocated_file; - ASSERT_OK(NewFile(&file_group, 0, id, &unallocated_file)); - - chmod(scratch_subdir.c_str(), S_IRWXU); + // No errors should be encountered during allocation since allocation is purely logical. + chmod(scratch_subdirs[0].c_str(), 0); + ASSERT_OK(FileAllocateSpace(allocated_files[0], 1, &offset)); + ASSERT_OK(FileAllocateSpace(allocated_files[1], 1, &offset)); + + chmod(scratch_subdirs[0].c_str(), S_IRWXU); FileSystemUtil::RemovePaths(tmp_dirs); file_group.Close(); } @@ -256,51 +335,86 @@ TEST_F(TmpFileMgrTest, TestScratchLimit) { tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()); const int64_t LIMIT = 100; - const int64_t FILE1_ALLOC = 25; - const int64_t FILE2_ALLOC = LIMIT - FILE1_ALLOC; - TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_, LIMIT); - TmpFileMgr::File* file1; - TmpFileMgr::File* file2; + const int64_t ALLOC_SIZE = 50; TUniqueId id; - ASSERT_OK(NewFile(&file_group, 0, id, &file1)); - ASSERT_OK(NewFile(&file_group, 1, id, &file2)); + TmpFileMgr::FileGroup file_group( + &tmp_file_mgr, io_mgr(), profile_, id, ALLOC_SIZE, LIMIT); + + vector<TmpFileMgr::File*> files; + ASSERT_OK(CreateFiles(&file_group, &files)); // Test individual limit is enforced. Status status; int64_t offset; TmpFileMgr::File* alloc_file; - // Alloc from both files should fail. - for (int i = 0; i <= 1; ++i) { - status = file_group.AllocateSpace(LIMIT + 1, &alloc_file, &offset); - ASSERT_FALSE(status.ok()); - ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED); - } // Alloc from file 1 should succeed. - ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED); - ASSERT_OK(file_group.AllocateSpace(FILE1_ALLOC, &alloc_file, &offset)); - ASSERT_EQ(alloc_file, file1); // Should select files round-robin. + SetNextAllocationIndex(&file_group, 0); + ASSERT_OK(GroupAllocateSpace(&file_group, ALLOC_SIZE, &alloc_file, &offset)); + ASSERT_EQ(alloc_file, files[0]); // Should select files round-robin. ASSERT_EQ(0, offset); - // Test aggregate limit is enforced on both files. - for (int i = 0; i <= 1; ++i) { - status = file_group.AllocateSpace(FILE2_ALLOC + 1, &alloc_file, &offset); - ASSERT_FALSE(status.ok()); - ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED); - } - // Allocate up to the max. - ASSERT_OK(file_group.AllocateSpace(FILE2_ALLOC, &alloc_file, &offset)); + ASSERT_OK(GroupAllocateSpace(&file_group, ALLOC_SIZE, &alloc_file, &offset)); ASSERT_EQ(0, offset); - ASSERT_EQ(alloc_file, file2); + ASSERT_EQ(alloc_file, files[1]); - // Test aggregate limit still enforced. - status = file_group.AllocateSpace(1, &alloc_file, &offset); + // Test aggregate limit is enforced. + status = GroupAllocateSpace(&file_group, 1, &alloc_file, &offset); ASSERT_FALSE(status.ok()); ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED); file_group.Close(); } + +// Test that scratch file ranges are recycled as expected. +TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) { + const int64_t ALLOC_SIZE = 50; + TUniqueId id; + TmpFileMgr::FileGroup file_group( + test_env_->tmp_file_mgr(), io_mgr(), profile_, id, ALLOC_SIZE); + + // Generate some data. + const int BLOCKS = 5; + vector<vector<uint8_t>> data(BLOCKS); + for (int i = 0; i < BLOCKS; ++i) { + data[i].resize(ALLOC_SIZE); + std::iota(data[i].begin(), data[i].end(), i); + } + + DiskIoMgr::WriteRange::WriteDoneCallback callback = + bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1); + vector<unique_ptr<TmpFileMgr::WriteHandle>> handles(BLOCKS); + // Make sure free space doesn't grow over several iterations. + const int TEST_ITERS = 5; + for (int i = 0; i < TEST_ITERS; ++i) { + cb_counter_ = 0; + for (int j = 0; j < BLOCKS; ++j) { + ASSERT_OK( + file_group.Write(MemRange(data[j].data(), ALLOC_SIZE), callback, &handles[j])); + } + WaitForCallbacks(BLOCKS); + EXPECT_EQ(ALLOC_SIZE * BLOCKS, BytesAllocated(&file_group)); + + // Read back and validate. + for (int j = 0; j < BLOCKS; ++j) { + uint8_t tmp[ALLOC_SIZE]; + ASSERT_OK(file_group.Read(handles[j].get(), MemRange(tmp, ALLOC_SIZE))); + EXPECT_EQ(0, memcmp(tmp, data[j].data(), ALLOC_SIZE)); + file_group.DestroyWriteHandle(move(handles[j])); + } + // Check that the space is still in use - it should be recycled by the next iteration. + EXPECT_EQ(ALLOC_SIZE * BLOCKS, BytesAllocated(&file_group)); + } + + file_group.Close(); + test_env_->TearDownRuntimeStates(); +} } -IMPALA_TEST_MAIN(); +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST); + impala::InitFeSupport(); + return RUN_ALL_TESTS(); +}
