IMPALA-3202: refactor scratch file management into TmpFileMgr This is a pure refactoring patch that moves all of the logic for allocating scratch file ranges into TmpFileMgr in anticipation of this logic being used by the new BufferPool.
There should be no behavioural changes. Also remove a bunch of TODOs that we're not going to fix. Change-Id: I0c56c195f3f28d520034f8c384494e566635fc62 Reviewed-on: http://gerrit.cloudera.org:8080/4898 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/46f5ad48 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/46f5ad48 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/46f5ad48 Branch: refs/heads/master Commit: 46f5ad48e38b456d7a2f36e7d4c169f7cfe67a86 Parents: 45ac10a Author: Tim Armstrong <[email protected]> Authored: Wed Oct 26 15:32:55 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Thu Nov 17 21:56:14 2016 +0000 ---------------------------------------------------------------------- be/src/runtime/buffered-block-mgr-test.cc | 404 ++++++++++++------------- be/src/runtime/buffered-block-mgr.cc | 97 ++---- be/src/runtime/buffered-block-mgr.h | 74 ++--- be/src/runtime/tmp-file-mgr-test.cc | 117 ++++--- be/src/runtime/tmp-file-mgr.cc | 84 ++++- be/src/runtime/tmp-file-mgr.h | 97 +++--- 6 files changed, 441 insertions(+), 432 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46f5ad48/be/src/runtime/buffered-block-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc index 92c0e89..9f0222d 100644 --- a/be/src/runtime/buffered-block-mgr-test.cc +++ b/be/src/runtime/buffered-block-mgr-test.cc @@ -95,7 +95,7 @@ class BufferedBlockMgrTest : public ::testing::Test { const string& dir = Substitute("/tmp/buffered-block-mgr-test.$0", i); // Fix permissions in case old directories were left from previous runs of test. chmod((dir + SCRATCH_SUFFIX).c_str(), S_IRWXU); - EXPECT_TRUE(FileSystemUtil::RemoveAndCreateDirectory(dir).ok()); + EXPECT_OK(FileSystemUtil::RemoveAndCreateDirectory(dir)); tmp_dirs.push_back(dir); created_tmp_dirs_.push_back(dir); } @@ -105,8 +105,8 @@ class BufferedBlockMgrTest : public ::testing::Test { } static void ValidateBlock(BufferedBlockMgr::Block* block, int32_t data) { - EXPECT_EQ(block->valid_data_len(), sizeof(int32_t)); - EXPECT_EQ(*reinterpret_cast<int32_t*>(block->buffer()), data); + ASSERT_EQ(block->valid_data_len(), sizeof(int32_t)); + ASSERT_EQ(*reinterpret_cast<int32_t*>(block->buffer()), data); } static int32_t* MakeRandomSizeData(BufferedBlockMgr::Block* block) { @@ -128,13 +128,13 @@ class BufferedBlockMgrTest : public ::testing::Test { int32_t bsize = *(reinterpret_cast<int32_t*>(block->buffer())); uint8_t* data = reinterpret_cast<uint8_t*>(block->buffer()); int i; - EXPECT_EQ(block->valid_data_len(), size); - EXPECT_EQ(size, bsize); - for (i = 4; i < size-5; ++i) { - EXPECT_EQ(data[i], i); + ASSERT_EQ(block->valid_data_len(), size); + ASSERT_EQ(size, bsize); + for (i = 4; i < size - 5; ++i) { + ASSERT_EQ(data[i], i); } for (; i < size; ++i) { - EXPECT_EQ(data[i], 0xff); + ASSERT_EQ(data[i], 0xff); } } @@ -142,8 +142,8 @@ class BufferedBlockMgrTest : public ::testing::Test { BufferedBlockMgr* CreateMgr(int64_t query_id, int max_buffers, int block_size, RuntimeState** query_state = NULL, TQueryOptions* query_options = NULL) { RuntimeState* state; - EXPECT_TRUE(test_env_->CreateQueryState(query_id, max_buffers, block_size, - &state, query_options).ok()); + EXPECT_OK(test_env_->CreateQueryState( + query_id, max_buffers, block_size, &state, query_options)); if (query_state != NULL) *query_state = state; return state->block_mgr(); } @@ -158,8 +158,8 @@ class BufferedBlockMgrTest : public ::testing::Test { BufferedBlockMgr::Client** client, RuntimeState** query_state = NULL, TQueryOptions* query_options = NULL) { RuntimeState* state; - BufferedBlockMgr* mgr = CreateMgr(query_id, max_buffers, block_size, &state, - query_options); + BufferedBlockMgr* mgr = + CreateMgr(query_id, max_buffers, block_size, &state, query_options); MemTracker* client_tracker = NewClientTracker(state); EXPECT_OK(mgr->RegisterClient(Substitute("Client for query $0", query_id), @@ -194,8 +194,8 @@ class BufferedBlockMgrTest : public ::testing::Test { Status status; BufferedBlockMgr::Block* new_block; for (int i = 0; i < num_blocks; ++i) { - EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block)); - EXPECT_TRUE(new_block != NULL); + ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block)); + ASSERT_TRUE(new_block != NULL); data = new_block->Allocate<int32_t>(sizeof(int32_t)); *data = blocks->size(); blocks->push_back(new_block); @@ -206,8 +206,8 @@ class BufferedBlockMgrTest : public ::testing::Test { void PinBlocks(const vector<BufferedBlockMgr::Block*>& blocks) { for (int i = 0; i < blocks.size(); ++i) { bool pinned; - EXPECT_OK(blocks[i]->Pin(&pinned)); - EXPECT_TRUE(pinned); + ASSERT_OK(blocks[i]->Pin(&pinned)); + ASSERT_TRUE(pinned); } } @@ -218,9 +218,9 @@ class BufferedBlockMgrTest : public ::testing::Test { for (int i = 0; i < blocks.size(); ++i) { Status status = blocks[i]->Unpin(); if (expect_cancelled) { - EXPECT_TRUE(status.ok() || status.IsCancelled()) << status.msg().msg(); + ASSERT_TRUE(status.ok() || status.IsCancelled()) << status.msg().msg(); } else { - EXPECT_TRUE(status.ok()) << status.msg().msg(); + ASSERT_TRUE(status.ok()) << status.msg().msg(); } } } @@ -250,7 +250,7 @@ class BufferedBlockMgrTest : public ::testing::Test { SleepForMs(WRITE_CHECK_INTERVAL_MILLIS); if (AllWritesComplete(block_mgrs)) return; } - EXPECT_TRUE(false) << "Writes did not complete after " << WRITE_WAIT_MILLIS << "ms"; + ASSERT_TRUE(false) << "Writes did not complete after " << WRITE_WAIT_MILLIS << "ms"; } static bool AllWritesComplete(BufferedBlockMgr* block_mgr) { @@ -270,8 +270,8 @@ class BufferedBlockMgrTest : public ::testing::Test { // should fail. Expects backing file has already been allocated. static void DeleteBackingFile(BufferedBlockMgr::Block* block) { const string& path = block->TmpFilePath(); - EXPECT_GT(path.size(), 0); - EXPECT_TRUE(remove(path)); + ASSERT_GT(path.size(), 0); + ASSERT_TRUE(remove(path)); LOG(INFO) << "Injected fault by deleting file " << path; } @@ -297,7 +297,7 @@ class BufferedBlockMgrTest : public ::testing::Test { BufferedBlockMgr* block_mgr; BufferedBlockMgr::Client* client; block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false, &client); - EXPECT_EQ(test_env_->TotalQueryMemoryConsumption(), 0); + ASSERT_EQ(test_env_->TotalQueryMemoryConsumption(), 0); // Allocate blocks until max_num_blocks, they should all succeed and memory // usage should go up. @@ -305,32 +305,32 @@ class BufferedBlockMgrTest : public ::testing::Test { BufferedBlockMgr::Block* first_block = NULL; for (int i = 0; i < max_num_blocks; ++i) { status = block_mgr->GetNewBlock(client, NULL, &new_block); - EXPECT_TRUE(new_block != NULL); - EXPECT_EQ(block_mgr->bytes_allocated(), (i + 1) * block_size); + ASSERT_TRUE(new_block != NULL); + ASSERT_EQ(block_mgr->bytes_allocated(), (i + 1) * block_size); if (first_block == NULL) first_block = new_block; blocks.push_back(new_block); } // Trying to allocate a new one should fail. - status = block_mgr->GetNewBlock(client, NULL, &new_block); - EXPECT_TRUE(new_block == NULL); - EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size); + ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block)); + ASSERT_TRUE(new_block == NULL); + ASSERT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size); // We can allocate a new block by transferring an already allocated one. uint8_t* old_buffer = first_block->buffer(); - status = block_mgr->GetNewBlock(client, first_block, &new_block); - EXPECT_TRUE(new_block != NULL); - EXPECT_EQ(old_buffer, new_block->buffer()); - EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size); - EXPECT_TRUE(!first_block->is_pinned()); + ASSERT_OK(block_mgr->GetNewBlock(client, first_block, &new_block)); + ASSERT_TRUE(new_block != NULL); + ASSERT_EQ(old_buffer, new_block->buffer()); + ASSERT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size); + ASSERT_TRUE(!first_block->is_pinned()); blocks.push_back(new_block); // Trying to allocate a new one should still fail. - status = block_mgr->GetNewBlock(client, NULL, &new_block); - EXPECT_TRUE(new_block == NULL); - EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size); + ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block)); + ASSERT_TRUE(new_block == NULL); + ASSERT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size); - EXPECT_EQ(block_mgr->writes_issued(), 1); + ASSERT_EQ(block_mgr->writes_issued(), 1); DeleteBlocks(blocks); TearDownMgrs(); @@ -350,32 +350,32 @@ class BufferedBlockMgrTest : public ::testing::Test { vector<BufferedBlockMgr::Block*> blocks; AllocateBlocks(block_mgr, client, max_num_buffers, &blocks); - EXPECT_EQ(block_mgr->bytes_allocated(), max_num_buffers * block_size); - for (BufferedBlockMgr::Block* block: blocks) block->Unpin(); + ASSERT_EQ(block_mgr->bytes_allocated(), max_num_buffers * block_size); + for (BufferedBlockMgr::Block* block : blocks) block->Unpin(); // Re-pinning all blocks for (int i = 0; i < blocks.size(); ++i) { bool pinned; - EXPECT_OK(blocks[i]->Pin(&pinned)); - EXPECT_TRUE(pinned); + ASSERT_OK(blocks[i]->Pin(&pinned)); + ASSERT_TRUE(pinned); ValidateBlock(blocks[i], i); } int buffered_pins_expected = blocks.size(); - EXPECT_EQ(buffered_pin->value(), buffered_pins_expected); + ASSERT_EQ(buffered_pin->value(), buffered_pins_expected); // Unpin all blocks - for (BufferedBlockMgr::Block* block: blocks) block->Unpin(); + for (BufferedBlockMgr::Block* block : blocks) block->Unpin(); // Get two new blocks. AllocateBlocks(block_mgr, client, 2, &blocks); // At least two writes must be issued. The first (num_blocks - 2) must be in memory. - EXPECT_GE(block_mgr->writes_issued(), 2); + ASSERT_GE(block_mgr->writes_issued(), 2); for (int i = 0; i < (max_num_buffers - 2); ++i) { bool pinned; - EXPECT_OK(blocks[i]->Pin(&pinned)); - EXPECT_TRUE(pinned); + ASSERT_OK(blocks[i]->Pin(&pinned)); + ASSERT_TRUE(pinned); ValidateBlock(blocks[i], i); } - EXPECT_GE(buffered_pin->value(), buffered_pins_expected); + ASSERT_GE(buffered_pin->value(), buffered_pins_expected); DeleteBlocks(blocks); TearDownMgrs(); } @@ -400,9 +400,9 @@ class BufferedBlockMgrTest : public ::testing::Test { ApiFunction api_function; BufferedBlockMgr::Client* client; - EXPECT_OK( + ASSERT_OK( block_mgr->RegisterClient("", 0, false, NewClientTracker(state), state, &client)); - EXPECT_TRUE(client != NULL); + ASSERT_TRUE(client != NULL); pinned_blocks.reserve(num_buffers); BufferedBlockMgr::Block* new_block; @@ -435,12 +435,12 @@ class BufferedBlockMgrTest : public ::testing::Test { case New: status = block_mgr->GetNewBlock(client, NULL, &new_block); if (close_called || (tid != SINGLE_THREADED_TID && status.IsCancelled())) { - EXPECT_TRUE(new_block == NULL); - EXPECT_TRUE(status.IsCancelled()); + ASSERT_TRUE(new_block == NULL); + ASSERT_TRUE(status.IsCancelled()); continue; } - EXPECT_OK(status); - EXPECT_TRUE(new_block != NULL); + ASSERT_OK(status); + ASSERT_TRUE(new_block != NULL); data = MakeRandomSizeData(new_block); block_data = make_pair(new_block, *data); @@ -452,17 +452,17 @@ class BufferedBlockMgrTest : public ::testing::Test { block_data = unpinned_blocks[rand_pick]; status = block_data.first->Pin(&pinned); if (close_called || (tid != SINGLE_THREADED_TID && status.IsCancelled())) { - EXPECT_TRUE(status.IsCancelled()); + ASSERT_TRUE(status.IsCancelled()); // In single-threaded runs the block should not have been pinned. // In multi-threaded runs Pin() may return the block pinned but the status to // be cancelled. In this case we could move the block from unpinned_blocks // to pinned_blocks. We do not do that because after IsCancelled() no actual // block operations should take place. - if (tid == SINGLE_THREADED_TID) EXPECT_FALSE(pinned); + if (tid == SINGLE_THREADED_TID) ASSERT_FALSE(pinned); continue; } - EXPECT_OK(status); - EXPECT_TRUE(pinned); + ASSERT_OK(status); + ASSERT_TRUE(pinned); ValidateRandomSizeData(block_data.first, block_data.second); unpinned_blocks[rand_pick] = unpinned_blocks.back(); unpinned_blocks.pop_back(); @@ -476,10 +476,10 @@ class BufferedBlockMgrTest : public ::testing::Test { block_data = pinned_blocks[rand_pick]; status = block_data.first->Unpin(); if (close_called || (tid != SINGLE_THREADED_TID && status.IsCancelled())) { - EXPECT_TRUE(status.IsCancelled()); + ASSERT_TRUE(status.IsCancelled()); continue; } - EXPECT_OK(status); + ASSERT_OK(status); pinned_blocks[rand_pick] = pinned_blocks.back(); pinned_blocks.pop_back(); pinned_block_map[pinned_blocks[rand_pick].first] = rand_pick; @@ -599,50 +599,50 @@ TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) { BufferedBlockMgr::Client* client; block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false, &client); MemTracker* client_tracker = block_mgr->get_tracker(client); - EXPECT_EQ(0, test_env_->TotalQueryMemoryConsumption()); + ASSERT_EQ(0, test_env_->TotalQueryMemoryConsumption()); vector<BufferedBlockMgr::Block*> blocks; // Allocate a small block. BufferedBlockMgr::Block* new_block = NULL; - EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, 128)); - EXPECT_TRUE(new_block != NULL); - EXPECT_EQ(block_mgr->bytes_allocated(), 0); - EXPECT_EQ(block_mgr->mem_tracker()->consumption(), 0); - EXPECT_EQ(client_tracker->consumption(), 128); - EXPECT_TRUE(new_block->is_pinned()); - EXPECT_EQ(new_block->BytesRemaining(), 128); - EXPECT_TRUE(new_block->buffer() != NULL); + ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, 128)); + ASSERT_TRUE(new_block != NULL); + ASSERT_EQ(block_mgr->bytes_allocated(), 0); + ASSERT_EQ(block_mgr->mem_tracker()->consumption(), 0); + ASSERT_EQ(client_tracker->consumption(), 128); + ASSERT_TRUE(new_block->is_pinned()); + ASSERT_EQ(new_block->BytesRemaining(), 128); + ASSERT_TRUE(new_block->buffer() != NULL); blocks.push_back(new_block); // Allocate a normal block - EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block)); - EXPECT_TRUE(new_block != NULL); - EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size()); - EXPECT_EQ(block_mgr->mem_tracker()->consumption(), block_mgr->max_block_size()); - EXPECT_EQ(client_tracker->consumption(), 128 + block_mgr->max_block_size()); - EXPECT_TRUE(new_block->is_pinned()); - EXPECT_EQ(new_block->BytesRemaining(), block_mgr->max_block_size()); - EXPECT_TRUE(new_block->buffer() != NULL); + ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block)); + ASSERT_TRUE(new_block != NULL); + ASSERT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size()); + ASSERT_EQ(block_mgr->mem_tracker()->consumption(), block_mgr->max_block_size()); + ASSERT_EQ(client_tracker->consumption(), 128 + block_mgr->max_block_size()); + ASSERT_TRUE(new_block->is_pinned()); + ASSERT_EQ(new_block->BytesRemaining(), block_mgr->max_block_size()); + ASSERT_TRUE(new_block->buffer() != NULL); blocks.push_back(new_block); // Allocate another small block. - EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, 512)); - EXPECT_TRUE(new_block != NULL); - EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size()); - EXPECT_EQ(block_mgr->mem_tracker()->consumption(), block_mgr->max_block_size()); - EXPECT_EQ(client_tracker->consumption(), 128 + 512 + block_mgr->max_block_size()); - EXPECT_TRUE(new_block->is_pinned()); - EXPECT_EQ(new_block->BytesRemaining(), 512); - EXPECT_TRUE(new_block->buffer() != NULL); + ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, 512)); + ASSERT_TRUE(new_block != NULL); + ASSERT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size()); + ASSERT_EQ(block_mgr->mem_tracker()->consumption(), block_mgr->max_block_size()); + ASSERT_EQ(client_tracker->consumption(), 128 + 512 + block_mgr->max_block_size()); + ASSERT_TRUE(new_block->is_pinned()); + ASSERT_EQ(new_block->BytesRemaining(), 512); + ASSERT_TRUE(new_block->buffer() != NULL); blocks.push_back(new_block); // Should be able to unpin and pin the middle block - EXPECT_OK(blocks[1]->Unpin()); + ASSERT_OK(blocks[1]->Unpin()); bool pinned; - EXPECT_OK(blocks[1]->Pin(&pinned)); - EXPECT_TRUE(pinned); + ASSERT_OK(blocks[1]->Pin(&pinned)); + ASSERT_TRUE(pinned); DeleteBlocks(blocks); TearDownMgrs(); @@ -661,7 +661,7 @@ TEST_F(BufferedBlockMgrTest, Pin) { // Unpin them all. for (int i = 0; i < blocks.size(); ++i) { - EXPECT_OK(blocks[i]->Unpin()); + ASSERT_OK(blocks[i]->Unpin()); } // Allocate more, this should work since we just unpinned some blocks. @@ -669,23 +669,23 @@ TEST_F(BufferedBlockMgrTest, Pin) { // Try to pin a unpinned block, this should not be possible. bool pinned; - EXPECT_OK(blocks[0]->Pin(&pinned)); - EXPECT_FALSE(pinned); + ASSERT_OK(blocks[0]->Pin(&pinned)); + ASSERT_FALSE(pinned); // Unpin all blocks. for (int i = 0; i < blocks.size(); ++i) { - EXPECT_OK(blocks[i]->Unpin()); + ASSERT_OK(blocks[i]->Unpin()); } // Should be able to pin max_num_blocks blocks. for (int i = 0; i < max_num_blocks; ++i) { - EXPECT_OK(blocks[i]->Pin(&pinned)); - EXPECT_TRUE(pinned); + ASSERT_OK(blocks[i]->Pin(&pinned)); + ASSERT_TRUE(pinned); } // Can't pin any more though. - EXPECT_OK(blocks[max_num_blocks]->Pin(&pinned)); - EXPECT_FALSE(pinned); + ASSERT_OK(blocks[max_num_blocks]->Pin(&pinned)); + ASSERT_FALSE(pinned); DeleteBlocks(blocks); TearDownMgrs(); @@ -713,13 +713,13 @@ TEST_F(BufferedBlockMgrTest, Deletion) { vector<BufferedBlockMgr::Block*> blocks; AllocateBlocks(block_mgr, client, max_num_buffers, &blocks); - EXPECT_EQ(created_cnt->value(), max_num_buffers); + ASSERT_EQ(created_cnt->value(), max_num_buffers); DeleteBlocks(blocks); blocks.clear(); AllocateBlocks(block_mgr, client, max_num_buffers, &blocks); - EXPECT_EQ(created_cnt->value(), max_num_buffers); - EXPECT_EQ(recycled_cnt->value(), max_num_buffers); + ASSERT_EQ(created_cnt->value(), max_num_buffers); + ASSERT_EQ(recycled_cnt->value(), max_num_buffers); DeleteBlocks(blocks); TearDownMgrs(); @@ -736,43 +736,43 @@ TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) { // Pinned I/O block. BufferedBlockMgr::Block* new_block; - EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block)); - EXPECT_TRUE(new_block != NULL); - EXPECT_TRUE(new_block->is_pinned()); - EXPECT_TRUE(new_block->is_max_size()); + ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block)); + ASSERT_TRUE(new_block != NULL); + ASSERT_TRUE(new_block->is_pinned()); + ASSERT_TRUE(new_block->is_max_size()); new_block->Delete(); - EXPECT_EQ(0, client_tracker->consumption()); + ASSERT_EQ(0, client_tracker->consumption()); // Pinned non-I/O block. int small_block_size = 128; - EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, small_block_size)); - EXPECT_TRUE(new_block != NULL); - EXPECT_TRUE(new_block->is_pinned()); - EXPECT_EQ(small_block_size, client_tracker->consumption()); + ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, small_block_size)); + ASSERT_TRUE(new_block != NULL); + ASSERT_TRUE(new_block->is_pinned()); + ASSERT_EQ(small_block_size, client_tracker->consumption()); new_block->Delete(); - EXPECT_EQ(0, client_tracker->consumption()); + ASSERT_EQ(0, client_tracker->consumption()); // Unpinned I/O block - delete after written to disk. - EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block)); - EXPECT_TRUE(new_block != NULL); - EXPECT_TRUE(new_block->is_pinned()); - EXPECT_TRUE(new_block->is_max_size()); + ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block)); + ASSERT_TRUE(new_block != NULL); + ASSERT_TRUE(new_block->is_pinned()); + ASSERT_TRUE(new_block->is_max_size()); new_block->Unpin(); - EXPECT_FALSE(new_block->is_pinned()); + ASSERT_FALSE(new_block->is_pinned()); WaitForWrites(block_mgr); new_block->Delete(); - EXPECT_EQ(client_tracker->consumption(), 0); + ASSERT_EQ(client_tracker->consumption(), 0); // Unpinned I/O block - delete before written to disk. - EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block)); - EXPECT_TRUE(new_block != NULL); - EXPECT_TRUE(new_block->is_pinned()); - EXPECT_TRUE(new_block->is_max_size()); + ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block)); + ASSERT_TRUE(new_block != NULL); + ASSERT_TRUE(new_block->is_pinned()); + ASSERT_TRUE(new_block->is_max_size()); new_block->Unpin(); - EXPECT_FALSE(new_block->is_pinned()); + ASSERT_FALSE(new_block->is_pinned()); new_block->Delete(); WaitForWrites(block_mgr); - EXPECT_EQ(client_tracker->consumption(), 0); + ASSERT_EQ(client_tracker->consumption(), 0); TearDownMgrs(); } @@ -812,7 +812,7 @@ TEST_F(BufferedBlockMgrTest, TransferBufferDuringWrite) { // number of buffers. int reserved_buffers = trial % max_num_buffers; BufferedBlockMgr::Client* tmp_client; - EXPECT_OK(block_mgr->RegisterClient("tmp_client", reserved_buffers, false, + ASSERT_OK(block_mgr->RegisterClient("tmp_client", reserved_buffers, false, NewClientTracker(query_state), query_state, &tmp_client)); BufferedBlockMgr::Block* tmp_block; ASSERT_OK(block_mgr->GetNewBlock(tmp_client, NULL, &tmp_block)); @@ -848,13 +848,13 @@ TEST_F(BufferedBlockMgrTest, Close) { BufferedBlockMgr::Block* new_block; Status status = block_mgr->GetNewBlock(client, NULL, &new_block); - EXPECT_TRUE(status.IsCancelled()); - EXPECT_TRUE(new_block == NULL); + ASSERT_TRUE(status.IsCancelled()); + ASSERT_TRUE(new_block == NULL); status = blocks[0]->Unpin(); - EXPECT_TRUE(status.IsCancelled()); + ASSERT_TRUE(status.IsCancelled()); bool pinned; status = blocks[0]->Pin(&pinned); - EXPECT_TRUE(status.IsCancelled()); + ASSERT_TRUE(status.IsCancelled()); DeleteBlocks(blocks); TearDownMgrs(); @@ -930,7 +930,7 @@ void BufferedBlockMgrTest::TestRuntimeStateTeardown( if (wait_for_writes) WaitForWrites(block_mgr.get()); block_mgr.reset(); - EXPECT_EQ(test_env_->TotalQueryMemoryConsumption(), 0); + ASSERT_EQ(test_env_->TotalQueryMemoryConsumption(), 0); } TEST_F(BufferedBlockMgrTest, RuntimeStateTeardown) { @@ -998,14 +998,14 @@ TEST_F(BufferedBlockMgrTest, WriteError) { PinBlocks(blocks); // Remove the backing storage so that future writes will fail int num_files = clear_scratch_dir(); - EXPECT_GT(num_files, 0); + ASSERT_GT(num_files, 0); UnpinBlocks(blocks, true); WaitForWrites(block_mgr); // Subsequent calls should fail. DeleteBlocks(blocks); BufferedBlockMgr::Block* new_block; - EXPECT_TRUE(block_mgr->GetNewBlock(client, NULL, &new_block).IsCancelled()); - EXPECT_TRUE(new_block == NULL); + ASSERT_TRUE(block_mgr->GetNewBlock(client, NULL, &new_block).IsCancelled()); + ASSERT_TRUE(new_block == NULL); TearDownMgrs(); } @@ -1021,16 +1021,16 @@ TEST_F(BufferedBlockMgrTest, TmpFileAllocateError) { vector<BufferedBlockMgr::Block*> blocks; AllocateBlocks(block_mgr, client, max_num_buffers, &blocks); // Unpin a block, forcing a write. - EXPECT_OK(blocks[0]->Unpin()); + ASSERT_OK(blocks[0]->Unpin()); WaitForWrites(block_mgr); // Remove temporary files - subsequent operations will fail. int num_files = clear_scratch_dir(); - EXPECT_GT(num_files, 0); + ASSERT_GT(num_files, 0); // Current implementation will fail here because it tries to expand the tmp file // immediately. This behavior is not contractual but we want to know if it changes // accidentally. Status status = blocks[1]->Unpin(); - EXPECT_FALSE(status.ok()); + ASSERT_FALSE(status.ok()); DeleteBlocks(blocks); TearDownMgrs(); @@ -1078,24 +1078,24 @@ TEST_F(BufferedBlockMgrTest, DISABLED_WriteErrorBlacklist) { DeleteBackingFile(error_block); UnpinBlocks(all_blocks); // Should succeed since tmp file space was already allocated. WaitForWrites(block_mgrs); - EXPECT_TRUE(block_mgrs[error_mgr]->IsCancelled()); - EXPECT_FALSE(block_mgrs[no_error_mgr]->IsCancelled()); + ASSERT_TRUE(block_mgrs[error_mgr]->IsCancelled()); + ASSERT_FALSE(block_mgrs[no_error_mgr]->IsCancelled()); // Temporary device with error should no longer be active. vector<TmpFileMgr::DeviceId> active_tmp_devices = - test_env_->tmp_file_mgr()->active_tmp_devices(); - EXPECT_EQ(tmp_dirs.size() - 1, active_tmp_devices.size()); + test_env_->tmp_file_mgr()->active_tmp_devices(); + ASSERT_EQ(tmp_dirs.size() - 1, active_tmp_devices.size()); for (int i = 0; i < active_tmp_devices.size(); ++i) { - const string& device_path = test_env_->tmp_file_mgr()->GetTmpDirPath( - active_tmp_devices[i]); - EXPECT_EQ(string::npos, error_dir.find(device_path)); + const string& device_path = + test_env_->tmp_file_mgr()->GetTmpDirPath(active_tmp_devices[i]); + ASSERT_EQ(string::npos, error_dir.find(device_path)); } // The second block manager should continue using allocated scratch space, since it // didn't encounter a write error itself. In future this could change but for now it is // the intended behaviour. PinBlocks(blocks[no_error_mgr]); UnpinBlocks(blocks[no_error_mgr]); - EXPECT_TRUE(FindBlockForDir(blocks[no_error_mgr], good_dir) != NULL); - EXPECT_TRUE(FindBlockForDir(blocks[no_error_mgr], error_dir) != NULL); + ASSERT_TRUE(FindBlockForDir(blocks[no_error_mgr], good_dir) != NULL); + ASSERT_TRUE(FindBlockForDir(blocks[no_error_mgr], error_dir) != NULL); // The second block manager should avoid using bad directory for new blocks. vector<BufferedBlockMgr::Block*> no_error_new_blocks; AllocateBlocks(block_mgrs[no_error_mgr], clients[no_error_mgr], blocks_per_mgr, @@ -1104,7 +1104,7 @@ TEST_F(BufferedBlockMgrTest, DISABLED_WriteErrorBlacklist) { for (int i = 0; i < no_error_new_blocks.size(); ++i) { LOG(INFO) << "Newly created block backed by file " << no_error_new_blocks[i]->TmpFilePath(); - EXPECT_TRUE(BlockInDir(no_error_new_blocks[i], good_dir)); + ASSERT_TRUE(BlockInDir(no_error_new_blocks[i], good_dir)); } // A new block manager should only use the good dir for backing storage. BufferedBlockMgr::Client* new_client; @@ -1116,7 +1116,7 @@ TEST_F(BufferedBlockMgrTest, DISABLED_WriteErrorBlacklist) { for (int i = 0; i < blocks_per_mgr; ++i) { LOG(INFO) << "New manager Block " << i << " backed by file " << new_mgr_blocks[i]->TmpFilePath(); - EXPECT_TRUE(BlockInDir(new_mgr_blocks[i], good_dir)); + ASSERT_TRUE(BlockInDir(new_mgr_blocks[i], good_dir)); } } @@ -1151,7 +1151,7 @@ TEST_F(BufferedBlockMgrTest, AllocationErrorHandling) { // use the good dir. UnpinBlocks(blocks[0]); // Directories remain on active list even when they experience errors. - EXPECT_EQ(2, test_env_->tmp_file_mgr()->num_active_tmp_devices()); + ASSERT_EQ(2, test_env_->tmp_file_mgr()->num_active_tmp_devices()); // Blocks should not be written to bad dir even if it remains non-writable. UnpinBlocks(blocks[1]); // All writes should succeed. @@ -1175,7 +1175,7 @@ TEST_F(BufferedBlockMgrTest, NoDirsAllocationError) { chmod(tmp_scratch_subdir.c_str(), 0); } for (int i = 0; i < blocks.size(); ++i) { - EXPECT_FALSE(blocks[i]->Unpin().ok()); + ASSERT_FALSE(blocks[i]->Unpin().ok()); } DeleteBlocks(blocks); } @@ -1217,18 +1217,18 @@ TEST_F(BufferedBlockMgrTest, MultipleClients) { BufferedBlockMgr::Client* client1 = NULL; BufferedBlockMgr::Client* client2 = NULL; - EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false, + ASSERT_OK(block_mgr->RegisterClient("", client1_buffers, false, NewClientTracker(runtime_state), runtime_state, &client1)); - EXPECT_TRUE(client1 != NULL); - EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false, + ASSERT_TRUE(client1 != NULL); + ASSERT_OK(block_mgr->RegisterClient("", client2_buffers, false, NewClientTracker(runtime_state), runtime_state, &client2)); - EXPECT_TRUE(client2 != NULL); + ASSERT_TRUE(client2 != NULL); // Reserve client 1's and 2's buffers. They should succeed. bool reserved = block_mgr->TryAcquireTmpReservation(client1, 1); - EXPECT_TRUE(reserved); + ASSERT_TRUE(reserved); reserved = block_mgr->TryAcquireTmpReservation(client2, 1); - EXPECT_TRUE(reserved); + ASSERT_TRUE(reserved); vector<BufferedBlockMgr::Block*> client1_blocks; // Allocate all of client1's reserved blocks, they should all succeed. @@ -1236,71 +1236,71 @@ TEST_F(BufferedBlockMgrTest, MultipleClients) { // Try allocating one more, that should fail. BufferedBlockMgr::Block* block; - EXPECT_OK(block_mgr->GetNewBlock(client1, NULL, &block)); - EXPECT_TRUE(block == NULL); + ASSERT_OK(block_mgr->GetNewBlock(client1, NULL, &block)); + ASSERT_TRUE(block == NULL); // Trying to reserve should also fail. reserved = block_mgr->TryAcquireTmpReservation(client1, 1); - EXPECT_FALSE(reserved); + ASSERT_FALSE(reserved); // Allocate all of client2's reserved blocks, these should succeed. vector<BufferedBlockMgr::Block*> client2_blocks; AllocateBlocks(block_mgr, client2, client2_buffers, &client2_blocks); // Try allocating one more from client 2, that should fail. - EXPECT_OK(block_mgr->GetNewBlock(client2, NULL, &block)); - EXPECT_TRUE(block == NULL); + ASSERT_OK(block_mgr->GetNewBlock(client2, NULL, &block)); + ASSERT_TRUE(block == NULL); // Unpin one block from client 1. - EXPECT_OK(client1_blocks[0]->Unpin()); + ASSERT_OK(client1_blocks[0]->Unpin()); // Client 2 should still not be able to allocate. - EXPECT_OK(block_mgr->GetNewBlock(client2, NULL, &block)); - EXPECT_TRUE(block == NULL); + ASSERT_OK(block_mgr->GetNewBlock(client2, NULL, &block)); + ASSERT_TRUE(block == NULL); // Client 2 should still not be able to reserve. reserved = block_mgr->TryAcquireTmpReservation(client2, 1); - EXPECT_FALSE(reserved); + ASSERT_FALSE(reserved); // Client 1 should be able to though. - EXPECT_OK(block_mgr->GetNewBlock(client1, NULL, &block)); - EXPECT_TRUE(block != NULL); + ASSERT_OK(block_mgr->GetNewBlock(client1, NULL, &block)); + ASSERT_TRUE(block != NULL); client1_blocks.push_back(block); // Unpin two of client 1's blocks (client 1 should have 3 unpinned blocks now). - EXPECT_OK(client1_blocks[1]->Unpin()); - EXPECT_OK(client1_blocks[2]->Unpin()); + ASSERT_OK(client1_blocks[1]->Unpin()); + ASSERT_OK(client1_blocks[2]->Unpin()); // Clear client 1's reservation block_mgr->ClearReservations(client1); // Client 2 should be able to reserve 1 buffers now (there are 2 left); reserved = block_mgr->TryAcquireTmpReservation(client2, 1); - EXPECT_TRUE(reserved); + ASSERT_TRUE(reserved); // Client one can only pin 1. bool pinned; - EXPECT_OK(client1_blocks[0]->Pin(&pinned)); - EXPECT_TRUE(pinned); + ASSERT_OK(client1_blocks[0]->Pin(&pinned)); + ASSERT_TRUE(pinned); // Can't get this one. - EXPECT_OK(client1_blocks[1]->Pin(&pinned)); - EXPECT_FALSE(pinned); + ASSERT_OK(client1_blocks[1]->Pin(&pinned)); + ASSERT_FALSE(pinned); // Client 2 can pick up the one reserved buffer - EXPECT_OK(block_mgr->GetNewBlock(client2, NULL, &block)); - EXPECT_TRUE(block != NULL); + ASSERT_OK(block_mgr->GetNewBlock(client2, NULL, &block)); + ASSERT_TRUE(block != NULL); client2_blocks.push_back(block); // But not a second BufferedBlockMgr::Block* block2; - EXPECT_OK(block_mgr->GetNewBlock(client2, NULL, &block2)); - EXPECT_TRUE(block2 == NULL); + ASSERT_OK(block_mgr->GetNewBlock(client2, NULL, &block2)); + ASSERT_TRUE(block2 == NULL); // Unpin client 2's block it got from the reservation. Sine this is a tmp // reservation, client 1 can pick it up again (it is not longer reserved). - EXPECT_OK(block->Unpin()); - EXPECT_OK(client1_blocks[1]->Pin(&pinned)); - EXPECT_TRUE(pinned); + ASSERT_OK(block->Unpin()); + ASSERT_OK(client1_blocks[1]->Pin(&pinned)); + ASSERT_TRUE(pinned); DeleteBlocks(client1_blocks); DeleteBlocks(client2_blocks); @@ -1319,12 +1319,12 @@ TEST_F(BufferedBlockMgrTest, MultipleClientsExtraBuffers) { BufferedBlockMgr::Client* client1 = NULL; BufferedBlockMgr::Client* client2 = NULL; BufferedBlockMgr::Block* block = NULL; - EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false, + ASSERT_OK(block_mgr->RegisterClient("", client1_buffers, false, NewClientTracker(runtime_state), runtime_state, &client1)); - EXPECT_TRUE(client1 != NULL); - EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false, + ASSERT_TRUE(client1 != NULL); + ASSERT_OK(block_mgr->RegisterClient("", client2_buffers, false, NewClientTracker(runtime_state), runtime_state, &client2)); - EXPECT_TRUE(client2 != NULL); + ASSERT_TRUE(client2 != NULL); vector<BufferedBlockMgr::Block*> client1_blocks; // Allocate all of client1's reserved blocks, they should all succeed. @@ -1335,18 +1335,18 @@ TEST_F(BufferedBlockMgrTest, MultipleClientsExtraBuffers) { AllocateBlocks(block_mgr, client2, client2_buffers, &client2_blocks); // We have two spare buffers now. Each client should be able to allocate it. - EXPECT_OK(block_mgr->GetNewBlock(client1, NULL, &block)); - EXPECT_TRUE(block != NULL); + ASSERT_OK(block_mgr->GetNewBlock(client1, NULL, &block)); + ASSERT_TRUE(block != NULL); client1_blocks.push_back(block); - EXPECT_OK(block_mgr->GetNewBlock(client2, NULL, &block)); - EXPECT_TRUE(block != NULL); + ASSERT_OK(block_mgr->GetNewBlock(client2, NULL, &block)); + ASSERT_TRUE(block != NULL); client2_blocks.push_back(block); // Now we are completely full, no one should be able to allocate a new block. - EXPECT_OK(block_mgr->GetNewBlock(client1, NULL, &block)); - EXPECT_TRUE(block == NULL); - EXPECT_OK(block_mgr->GetNewBlock(client2, NULL, &block)); - EXPECT_TRUE(block == NULL); + ASSERT_OK(block_mgr->GetNewBlock(client1, NULL, &block)); + ASSERT_TRUE(block == NULL); + ASSERT_OK(block_mgr->GetNewBlock(client2, NULL, &block)); + ASSERT_TRUE(block == NULL); DeleteBlocks(client1_blocks); DeleteBlocks(client2_blocks); @@ -1369,39 +1369,39 @@ TEST_F(BufferedBlockMgrTest, ClientOversubscription) { BufferedBlockMgr::Client* client2 = NULL; BufferedBlockMgr::Client* client3 = NULL; BufferedBlockMgr::Block* block = NULL; - EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false, + ASSERT_OK(block_mgr->RegisterClient("", client1_buffers, false, NewClientTracker(runtime_state), runtime_state, &client1)); - EXPECT_TRUE(client1 != NULL); - EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false, + ASSERT_TRUE(client1 != NULL); + ASSERT_OK(block_mgr->RegisterClient("", client2_buffers, false, NewClientTracker(runtime_state), runtime_state, &client2)); - EXPECT_TRUE(client2 != NULL); - EXPECT_OK(block_mgr->RegisterClient("", client3_buffers, true, + ASSERT_TRUE(client2 != NULL); + ASSERT_OK(block_mgr->RegisterClient("", client3_buffers, true, NewClientTracker(runtime_state), runtime_state, &client3)); - EXPECT_TRUE(client3 != NULL); + ASSERT_TRUE(client3 != NULL); // Client one allocates first block, should work. - EXPECT_OK(block_mgr->GetNewBlock(client1, NULL, &block)); - EXPECT_TRUE(block != NULL); + ASSERT_OK(block_mgr->GetNewBlock(client1, NULL, &block)); + ASSERT_TRUE(block != NULL); blocks.push_back(block); // Client two allocates first block, should work. - EXPECT_OK(block_mgr->GetNewBlock(client2, NULL, &block)); - EXPECT_TRUE(block != NULL); + ASSERT_OK(block_mgr->GetNewBlock(client2, NULL, &block)); + ASSERT_TRUE(block != NULL); blocks.push_back(block); // At this point we've used both buffers. Client one reserved one so subsequent // calls should fail with no error (but returns no block). - EXPECT_OK(block_mgr->GetNewBlock(client1, NULL, &block)); - EXPECT_TRUE(block == NULL); + ASSERT_OK(block_mgr->GetNewBlock(client1, NULL, &block)); + ASSERT_TRUE(block == NULL); // Allocate with client two. Since client two reserved 2 buffers, this should fail // with MEM_LIMIT_EXCEEDED. - EXPECT_TRUE(block_mgr->GetNewBlock(client2, NULL, &block).IsMemLimitExceeded()); + ASSERT_TRUE(block_mgr->GetNewBlock(client2, NULL, &block).IsMemLimitExceeded()); // Allocate with client three. Since client three can tolerate oversubscription, // this should fail with no error even though it was a reserved request. - EXPECT_OK(block_mgr->GetNewBlock(client3, NULL, &block)); - EXPECT_TRUE(block == NULL); + ASSERT_OK(block_mgr->GetNewBlock(client3, NULL, &block)); + ASSERT_TRUE(block == NULL); DeleteBlocks(blocks); TearDownMgrs(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46f5ad48/be/src/runtime/buffered-block-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc index b7a48a9..0c3d25f 100644 --- a/be/src/runtime/buffered-block-mgr.cc +++ b/be/src/runtime/buffered-block-mgr.cc @@ -209,25 +209,24 @@ BufferedBlockMgr::BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr : max_block_size_(block_size), // Keep two writes in flight per scratch disk so the disks can stay busy. block_write_threshold_(tmp_file_mgr->num_active_tmp_devices() * 2), - disable_spill_(state->query_ctx().disable_spilling || block_write_threshold_ == 0 || - scratch_limit == 0), + disable_spill_(state->query_ctx().disable_spilling || block_write_threshold_ == 0 + || scratch_limit == 0), query_id_(state->query_id()), - tmp_file_mgr_(tmp_file_mgr), initialized_(false), unfullfilled_reserved_buffers_(0), total_pinned_buffers_(0), non_local_outstanding_writes_(0), - tmp_file_group(new TmpFileMgr::FileGroup(tmp_file_mgr, scratch_limit)), + tmp_file_group_(NULL), io_mgr_(state->io_mgr()), is_cancelled_(false), writes_issued_(0), - debug_write_delay_ms_(0) { -} + debug_write_delay_ms_(0) {} Status BufferedBlockMgr::Create(RuntimeState* state, MemTracker* parent, RuntimeProfile* profile, TmpFileMgr* tmp_file_mgr, int64_t mem_limit, int64_t block_size, shared_ptr<BufferedBlockMgr>* block_mgr) { DCHECK(parent != NULL); + int64_t scratch_limit = state->query_options().scratch_limit; block_mgr->reset(); { lock_guard<SpinLock> lock(static_block_mgrs_lock_); @@ -238,12 +237,13 @@ Status BufferedBlockMgr::Create(RuntimeState* state, MemTracker* parent, // all shared_ptr references have gone to 0 and it is in the process of // being deleted. This can happen if the last shared reference is released // but before the weak ptr is removed from the map. - block_mgr->reset(new BufferedBlockMgr(state, tmp_file_mgr, block_size, - state->query_options().scratch_limit)); + block_mgr->reset( + new BufferedBlockMgr(state, tmp_file_mgr, block_size, scratch_limit)); query_to_block_mgrs_[state->query_id()] = *block_mgr; } } - (*block_mgr)->Init(state->io_mgr(), profile, parent, mem_limit); + (*block_mgr) + ->Init(state->io_mgr(), tmp_file_mgr, profile, parent, mem_limit, scratch_limit); return Status::OK(); } @@ -276,7 +276,6 @@ Status BufferedBlockMgr::RegisterClient(const string& debug_info, void BufferedBlockMgr::ClearReservations(Client* client) { lock_guard<mutex> lock(lock_); - // TODO: Can the modifications to the client's mem variables can be made w/o the lock? if (client->num_pinned_buffers_ < client->num_reserved_buffers_) { unfullfilled_reserved_buffers_ -= client->num_reserved_buffers_ - client->num_pinned_buffers_; @@ -289,7 +288,6 @@ void BufferedBlockMgr::ClearReservations(Client* client) { bool BufferedBlockMgr::TryAcquireTmpReservation(Client* client, int num_buffers) { lock_guard<mutex> lock(lock_); - // TODO: Can the modifications to the client's mem variables can be made w/o the lock? DCHECK_EQ(client->num_tmp_reserved_buffers_, 0); if (client->num_pinned_buffers_ < client->num_reserved_buffers_) { // If client has unused reserved buffers, we use those first. @@ -430,7 +428,6 @@ Status BufferedBlockMgr::GetNewBlock(Client* client, Block* unpin_block, Block** if (len > 0 && len < max_block_size_) { DCHECK(unpin_block == NULL); if (client->tracker_->TryConsume(len)) { - // TODO: Have a cache of unused blocks of size 'len' (0, max_block_size_) uint8_t* buffer = new uint8_t[len]; // Descriptors for non-I/O sized buffers are deleted when the block is deleted. new_block->buffer_desc_ = new BufferDescriptor(buffer, len); @@ -558,7 +555,7 @@ BufferedBlockMgr::~BufferedBlockMgr() { // See IMPALA-1890. DCHECK_EQ(non_local_outstanding_writes_, 0) << endl << DebugInternal(); // Delete tmp files. - tmp_file_group->Close(); + tmp_file_group_->Close(); // Validate that clients deleted all of their blocks. Since all writes have // completed at this point, any deleted blocks should be in unused_blocks_. @@ -594,8 +591,6 @@ MemTracker* BufferedBlockMgr::get_tracker(Client* client) const { return client->tracker_; } -// TODO: It would be good if we had a sync primitive that supports is_mine() calls, see -// IMPALA-1884. Status BufferedBlockMgr::DeleteOrUnpinBlock(Block* block, bool unpin) { if (block == NULL) { return IsCancelled() ? Status::CANCELLED : Status::OK(); @@ -746,12 +741,15 @@ Status BufferedBlockMgr::WriteUnpinnedBlock(Block* block) { DCHECK_EQ(block->buffer_desc_->len, max_block_size_); if (block->write_range_ == NULL) { - if (tmp_file_group->NumFiles() == 0) RETURN_IF_ERROR(InitTmpFiles()); + if (tmp_file_group_->NumFiles() == 0) { + RETURN_IF_ERROR(tmp_file_group_->CreateFiles(query_id_)); + } // First time the block is being persisted - need to allocate tmp file space. TmpFileMgr::File* tmp_file; int64_t file_offset; - RETURN_IF_ERROR(AllocateScratchSpace(max_block_size_, &tmp_file, &file_offset)); + RETURN_IF_ERROR( + tmp_file_group_->AllocateSpace(max_block_size_, &tmp_file, &file_offset)); int disk_id = tmp_file->disk_id(); if (disk_id < 0) { // Assign a valid disk id to the write range if the tmp file was not assigned one. @@ -799,38 +797,6 @@ void BufferedBlockMgr::WaitForWrite(unique_lock<mutex>& lock, Block* block) { } } -Status BufferedBlockMgr::AllocateScratchSpace(int64_t block_size, - TmpFileMgr::File** tmp_file, int64_t* file_offset) { - // Assumes block manager lock is already taken. - vector<Status> errs; - // Find the next physical file in round-robin order and create a write range for it. - for (int attempt = 0; attempt < tmp_file_group->NumFiles(); ++attempt) { - *tmp_file = tmp_file_group->GetFileAt(next_block_index_); - next_block_index_ = (next_block_index_ + 1) % tmp_file_group->NumFiles(); - if ((*tmp_file)->is_blacklisted()) continue; - Status status = (*tmp_file)->AllocateSpace(block_size, file_offset); - if (status.ok()) { - scratch_space_bytes_used_counter_->Add(block_size); - return Status::OK(); - } else if (status.code() == TErrorCode::SCRATCH_LIMIT_EXCEEDED) { - // We cannot allocate from any files if we're at the scratch limit. - return status; - } - // Log error and try other files if there was a problem. Problematic files will be - // blacklisted so we will not repeatedly log the same error. - LOG(WARNING) << "Error while allocating range in scratch file '" - << (*tmp_file)->path() << "': " << status.msg().msg() - << ". Will try another scratch file."; - errs.push_back(status); - } - Status err_status("No usable scratch files: space could not be allocated in any " - "of the configured scratch directories (--scratch_dirs)."); - for (int i = 0; i < errs.size(); ++i) { - err_status.MergeStatus(errs[i]); - } - return err_status; -} - void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) { #ifndef NDEBUG if (debug_write_delay_ms_ > 0) { @@ -1271,8 +1237,9 @@ string BufferedBlockMgr::DebugInternal() const { return ss.str(); } -void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, RuntimeProfile* parent_profile, - MemTracker* parent_tracker, int64_t mem_limit) { +void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, TmpFileMgr* tmp_file_mgr, + RuntimeProfile* parent_profile, MemTracker* parent_tracker, int64_t mem_limit, + int64_t scratch_limit) { unique_lock<mutex> l(lock_); if (initialized_) return; @@ -1281,6 +1248,9 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, RuntimeProfile* parent_profile, profile_.reset(new RuntimeProfile(&obj_pool_, "BlockMgr")); parent_profile->AddChild(profile_.get()); + tmp_file_group_.reset( + new TmpFileMgr::FileGroup(tmp_file_mgr, profile_.get(), scratch_limit)); + mem_limit_counter_ = ADD_COUNTER(profile_.get(), "MemoryLimit", TUnit::BYTES); mem_limit_counter_->Set(mem_limit); block_size_counter_ = ADD_COUNTER(profile_.get(), "MaxBlockSize", TUnit::BYTES); @@ -1294,8 +1264,6 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, RuntimeProfile* parent_profile, disk_read_timer_ = ADD_TIMER(profile_.get(), "TotalReadBlockTime"); buffer_wait_timer_ = ADD_TIMER(profile_.get(), "TotalBufferWaitTime"); encryption_timer_ = ADD_TIMER(profile_.get(), "TotalEncryptionTime"); - scratch_space_bytes_used_counter_ = - ADD_COUNTER(profile_.get(), "ScratchFileUsedBytes", TUnit::BYTES); // Create a new mem_tracker and allocate buffers. mem_tracker_.reset( @@ -1304,29 +1272,6 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, RuntimeProfile* parent_profile, initialized_ = true; } -Status BufferedBlockMgr::InitTmpFiles() { - DCHECK(tmp_file_group->NumFiles() == 0); - DCHECK(tmp_file_mgr_ != NULL); - - vector<TmpFileMgr::DeviceId> tmp_devices = tmp_file_mgr_->active_tmp_devices(); - int files_allocated = 0; - // Initialize the tmp files and the initial file to use. - for (int i = 0; i < tmp_devices.size(); ++i) { - TmpFileMgr::DeviceId tmp_device_id = tmp_devices[i]; - // It is possible for a device to be blacklisted after it was returned by - // active_tmp_devices(), handle this gracefully by ignoring the return status of - // NewFile(). - if (tmp_file_group->NewFile(tmp_device_id, query_id_).ok()) ++files_allocated; - } - DCHECK_EQ(tmp_file_group->NumFiles(), files_allocated); - if (tmp_file_group->NumFiles() == 0) { - return Status("No spilling directories configured. Cannot spill. Set --scratch_dirs" - " or see log for previous errors that prevented use of provided directories"); - } - next_block_index_ = rand() % tmp_file_group->NumFiles(); - return Status::OK(); -} - Status BufferedBlockMgr::EncryptAndHash(Block* block, uint8_t** outbuf) { DCHECK(FLAGS_disk_spill_encryption); DCHECK(block->buffer()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46f5ad48/be/src/runtime/buffered-block-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr.h b/be/src/runtime/buffered-block-mgr.h index 7b9d487..269b707 100644 --- a/be/src/runtime/buffered-block-mgr.h +++ b/be/src/runtime/buffered-block-mgr.h @@ -79,16 +79,8 @@ class RuntimeState; /// used simultaneously by multiple clients in any capacity. /// However, the block manager client is not thread-safe. That is, the block manager /// allows multiple single-threaded block manager clients. -// -/// TODO: When a block is read from disk, data is copied from the IOMgr buffer to the -/// block manager's buffer. This should be avoided in the common case where these buffers -/// are of the same size. -/// TODO: See if the one big lock is a bottleneck. Break it up. This object is shared by -/// all operators within a query (across fragments), see IMPALA-1883. -/// TODO: No reason we can't spill the smaller buffers. Add it if we need to (it's likely -/// just removing dchecks). -/// TODO: The requirements on this object has grown organically. Consider a major -/// reworking. +/// +/// TODO: replace with BufferPool. class BufferedBlockMgr { private: struct BufferDescriptor; @@ -100,8 +92,6 @@ class BufferedBlockMgr { /// The remaining memory that is not reserved by any clients is free for all and /// available to all clients. /// This is an opaque handle. - /// TODO: move the APIs to client we don't need to pass the BufferedBlockMgr around. - /// TODO: how can we ensure that each operator uses a separate client? struct Client; /// A fixed-size block of data that may be be persisted to disk. The state of the block @@ -267,7 +257,6 @@ class BufferedBlockMgr { /// Block state variables. The block's buffer can be freed only if is_pinned_ and /// in_write_ are both false. - /// TODO: this might be better expressed as an enum. /// is_pinned_ is true while the block is pinned by a client. bool is_pinned_; @@ -282,8 +271,6 @@ class BufferedBlockMgr { /// Condition variable to wait for the write to this block to finish. If 'in_write_' /// is true, notify_one() will eventually be called on this condition variable. Only /// on thread should wait on this cv at a time. - /// TODO: Currently we use block_mgr_->lock_ for this condvar. There is no reason to - /// use that lock_ that is already overloaded, see IMPALA-1883. boost::condition_variable write_complete_cv_; /// If true, this block is being written out so the underlying buffer can be @@ -302,21 +289,20 @@ class BufferedBlockMgr { ~BufferedBlockMgr(); - /// Registers a client with num_reserved_buffers. The returned client is owned + /// Registers a client with 'num_reserved_buffers'. The returned client is owned /// by the BufferedBlockMgr and has the same lifetime as it. /// We allow oversubscribing the reserved buffers. It is likely that the - /// num_reserved_buffers will be very pessimistic for small queries and we don't want to + /// 'num_reserved_buffers' will be very pessimistic for small queries and we don't want + /// to /// fail all of them with mem limit exceeded. /// The min reserved buffers is often independent of data size and we still want /// to run small queries with very small limits. /// Buffers used by this client are reflected in tracker. - /// tolerates_oversubscription determines how oversubscription is handled. If true, + /// 'tolerates_oversubscription' determines how oversubscription is handled. If true, /// failure to allocate a reserved buffer is not an error. If false, failure to /// allocate a reserved buffer is a MEM_LIMIT_EXCEEDED error. - /// debug_info is a string that will be printed in debug messages and errors to + /// 'debug_info' is a string that will be printed in debug messages and errors to /// identify the client. - /// TODO: The fact that we allow oversubscription is problematic. - /// as some code expects the reservations to always be granted (currently not the case). Status RegisterClient(const std::string& debug_info, int num_reserved_buffers, bool tolerates_oversubscription, MemTracker* tracker, RuntimeState* state, Client** client); @@ -365,8 +351,9 @@ class BufferedBlockMgr { /// TryAcquireTmpReservation may be used to fulfill the request if available. If the /// request is unsuccessful, that temporary buffer space is not consumed. /// Returns false if there was not enough memory. - /// TODO: this is added specifically to support the Buckets structure in the hash table - /// which does not map well to Blocks. Revisit this. + /// + /// This is used only for the Buckets structure in the hash table, which cannot be + /// segmented into blocks. bool ConsumeMemory(Client* client, int64_t size); /// All successful allocates bytes from ConsumeMemory() must have a corresponding @@ -410,21 +397,15 @@ class BufferedBlockMgr { /// Iterator into all_io_buffers_ for this buffer. std::list<BufferDescriptor*>::iterator all_buffers_it; - BufferDescriptor(uint8_t* buf, int64_t len) - : buffer(buf), len(len), block(NULL) { - } + BufferDescriptor(uint8_t* buf, int64_t len) : buffer(buf), len(len), block(NULL) {} }; - BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr, - int64_t block_size, int64_t scratch_limit); + BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr, int64_t block_size, + int64_t scratch_limit); /// Initializes the block mgr. Idempotent and thread-safe. - void Init(DiskIoMgr* io_mgr, RuntimeProfile* profile, - MemTracker* parent_tracker, int64_t mem_limit); - - /// Initializes tmp_files_. This is initialized the first time we need to write to disk. - /// Must be called with lock_ taken. - Status InitTmpFiles(); + void Init(DiskIoMgr* io_mgr, TmpFileMgr* tmp_file_mgr, RuntimeProfile* profile, + MemTracker* parent_tracker, int64_t mem_limit, int64_t scratch_limit); /// PinBlock(), UnpinBlock(), DeleteBlock() perform the actual work of Block::Pin(), /// Unpin() and Delete(). DeleteBlock() must be called without the lock_ taken and @@ -484,12 +465,6 @@ class BufferedBlockMgr { /// 'lock_' must be held with 'lock'. void WaitForWrite(boost::unique_lock<boost::mutex>& lock, Block* block); - /// Allocate block_size bytes in a temporary file. Try multiple disks if error occurs. - /// Returns an error only if no temporary files are usable or the scratch limit is - /// exceeded. - Status AllocateScratchSpace(int64_t block_size, TmpFileMgr::File** tmp_file, - int64_t* file_offset); - /// Callback used by DiskIoMgr to indicate a block write has completed. write_status /// is the status of the write. is_cancelled_ is set to true if write_status is not /// Status::OK or a re-issue of the write fails. Returns the block's buffer to the @@ -528,14 +503,10 @@ class BufferedBlockMgr { /// Track buffers allocated by the block manager. boost::scoped_ptr<MemTracker> mem_tracker_; - /// The temporary file manager used to allocate temporary file space. - TmpFileMgr* tmp_file_mgr_; - /// This lock protects the block and buffer lists below, except for unused_blocks_. - /// It also protects the various counters and changes to block state. Additionally, it is - /// used for the blocking condvars: buffer_available_cv_ and block->write_complete_cv_. - /// TODO: We should break the protection of the various structures and usages to - /// different spinlocks and a mutex to be used in the wait()s, see IMPALA-1883. + /// It also protects the various counters and changes to block state. Additionally, it + /// is used for the blocking condvars: buffer_available_cv_ and + /// block->write_complete_cv_. boost::mutex lock_; /// If true, Init() has been called. @@ -586,11 +557,7 @@ class BufferedBlockMgr { /// Group of temporary physical files, (one per tmp device) to which /// blocks may be written. Blocks are round-robined across these files. - boost::scoped_ptr<TmpFileMgr::FileGroup> tmp_file_group; - - /// Index into 'tmp_file_group_' denoting the file to which the next block will be - /// written. - int next_block_index_; + boost::scoped_ptr<TmpFileMgr::FileGroup> tmp_file_group_; /// DiskIoMgr handles to read and write blocks. DiskIoMgr* io_mgr_; @@ -632,9 +599,6 @@ class BufferedBlockMgr { /// Time spent in disk spill encryption, decryption, and integrity checking. RuntimeProfile::Counter* encryption_timer_; - /// Amount of scratch space allocated in bytes. - RuntimeProfile::Counter* scratch_space_bytes_used_counter_; - /// Number of writes issued. int writes_issued_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46f5ad48/be/src/runtime/tmp-file-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index 828e637..4deefd5 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -40,10 +40,12 @@ class TmpFileMgrTest : public ::testing::Test { protected: virtual void SetUp() { metrics_.reset(new MetricGroup("tmp-file-mgr-test")); + profile_ = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "tmp-file-mgr-test")); } virtual void TearDown() { metrics_.reset(); + obj_pool_.Clear(); } /// Check that metric values are consistent with TmpFileMgr state. @@ -69,7 +71,23 @@ class TmpFileMgrTest : public ::testing::Test { } } + /// Helper to call the private NewFile() method. + static Status NewFile(TmpFileMgr::FileGroup* group, + const TmpFileMgr::DeviceId& device_id, const TUniqueId& query_id, + TmpFileMgr::File** new_file) { + return group->NewFile(device_id, query_id, new_file); + } + + /// Helper to call the private TmpFile::AllocateSpace() method. + static Status AllocateSpace( + TmpFileMgr::File* file, int64_t num_bytes, int64_t* offset) { + return file->AllocateSpace(num_bytes, offset); + } + + ObjectPool obj_pool_; scoped_ptr<MetricGroup> metrics_; + // Owned by 'obj_pool_'. + RuntimeProfile* profile_; }; /// Regression test for IMPALA-2160. Verify that temporary file manager allocates blocks @@ -77,24 +95,22 @@ class TmpFileMgrTest : public ::testing::Test { TEST_F(TmpFileMgrTest, TestFileAllocation) { TmpFileMgr tmp_file_mgr; ASSERT_OK(tmp_file_mgr.Init(metrics_.get())); - TmpFileMgr::FileGroup file_group(&tmp_file_mgr); + TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_); // Default configuration should give us one temporary device. EXPECT_EQ(1, tmp_file_mgr.num_active_tmp_devices()); vector<TmpFileMgr::DeviceId> tmp_devices = tmp_file_mgr.active_tmp_devices(); EXPECT_EQ(1, tmp_devices.size()); TUniqueId id; - TmpFileMgr::File *file; - ASSERT_OK(file_group.NewFile(tmp_devices[0], id, &file)); + TmpFileMgr::File* file; + ASSERT_OK(NewFile(&file_group, tmp_devices[0], id, &file)); EXPECT_TRUE(file != NULL); // Apply writes of variable sizes and check space was allocated correctly. - int64_t write_sizes[] = { - 1, 10, 1024, 4, 1024 * 1024 * 8, 1024 * 1024 * 8, 16, 10 - }; - int num_write_sizes = sizeof(write_sizes)/sizeof(write_sizes[0]); + int64_t write_sizes[] = {1, 10, 1024, 4, 1024 * 1024 * 8, 1024 * 1024 * 8, 16, 10}; + int num_write_sizes = sizeof(write_sizes) / sizeof(write_sizes[0]); int64_t next_offset = 0; for (int i = 0; i < num_write_sizes; ++i) { int64_t offset; - ASSERT_OK(file->AllocateSpace(write_sizes[i], &offset)); + ASSERT_OK(AllocateSpace(file, write_sizes[i], &offset)); EXPECT_EQ(next_offset, offset); next_offset = offset + write_sizes[i]; EXPECT_EQ(next_offset, boost::filesystem::file_size(file->path())); @@ -113,15 +129,15 @@ TEST_F(TmpFileMgrTest, TestOneDirPerDevice) { RemoveAndCreateDirs(tmp_dirs); TmpFileMgr tmp_file_mgr; tmp_file_mgr.InitCustom(tmp_dirs, true, metrics_.get()); - TmpFileMgr::FileGroup file_group(&tmp_file_mgr); + TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_); // Only the first directory should be used. EXPECT_EQ(1, tmp_file_mgr.num_active_tmp_devices()); vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.active_tmp_devices(); EXPECT_EQ(1, devices.size()); TUniqueId id; - TmpFileMgr::File *file; - ASSERT_OK(file_group.NewFile(devices[0], id, &file)); + TmpFileMgr::File* file; + ASSERT_OK(NewFile(&file_group, devices[0], id, &file)); // Check the prefix is the expected temporary directory. EXPECT_EQ(0, file->path().find(tmp_dirs[0])); FileSystemUtil::RemovePaths(tmp_dirs); @@ -135,7 +151,7 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) { RemoveAndCreateDirs(tmp_dirs); TmpFileMgr tmp_file_mgr; tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()); - TmpFileMgr::FileGroup file_group(&tmp_file_mgr); + TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_); // Both directories should be used. EXPECT_EQ(2, tmp_file_mgr.num_active_tmp_devices()); @@ -144,8 +160,8 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) { for (int i = 0; i < tmp_dirs.size(); ++i) { EXPECT_EQ(0, tmp_file_mgr.GetTmpDirPath(devices[i]).find(tmp_dirs[i])); TUniqueId id; - TmpFileMgr::File *file; - ASSERT_OK(file_group.NewFile(devices[i], id, &file)); + TmpFileMgr::File* file; + ASSERT_OK(NewFile(&file_group, devices[i], id, &file)); // Check the prefix is the expected temporary directory. EXPECT_EQ(0, file->path().find(tmp_dirs[i])); } @@ -161,7 +177,7 @@ TEST_F(TmpFileMgrTest, TestReportError) { RemoveAndCreateDirs(tmp_dirs); TmpFileMgr tmp_file_mgr; tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()); - TmpFileMgr::FileGroup file_group(&tmp_file_mgr); + TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_); // Both directories should be used. vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.active_tmp_devices(); @@ -172,7 +188,7 @@ TEST_F(TmpFileMgrTest, TestReportError) { TUniqueId id; int good_device = 0, bad_device = 1; TmpFileMgr::File* bad_file; - ASSERT_OK(file_group.NewFile(devices[bad_device], id, &bad_file)); + ASSERT_OK(NewFile(&file_group, devices[bad_device], id, &bad_file)); ErrorMsg errmsg(TErrorCode::GENERAL, "A fake error"); bad_file->ReportIOError(errmsg); @@ -186,14 +202,14 @@ TEST_F(TmpFileMgrTest, TestReportError) { // Attempts to expand bad file should succeed. int64_t offset; - ASSERT_OK(bad_file->AllocateSpace(128, &offset)); + ASSERT_OK(AllocateSpace(bad_file, 128, &offset)); // The good device should still be usable. TmpFileMgr::File* good_file; - ASSERT_OK(file_group.NewFile(devices[good_device], id, &good_file)); + ASSERT_OK(NewFile(&file_group, devices[good_device], id, &good_file)); EXPECT_TRUE(good_file != NULL); - ASSERT_OK(good_file->AllocateSpace(128, &offset)); + ASSERT_OK(AllocateSpace(good_file, 128, &offset)); // Attempts to allocate new files on bad device should succeed. - ASSERT_OK(file_group.NewFile(devices[bad_device], id, &bad_file)); + ASSERT_OK(NewFile(&file_group, devices[bad_device], id, &bad_file)); FileSystemUtil::RemovePaths(tmp_dirs); file_group.Close(); CheckMetrics(&tmp_file_mgr); @@ -206,26 +222,26 @@ TEST_F(TmpFileMgrTest, TestAllocateFails) { RemoveAndCreateDirs(tmp_dirs); TmpFileMgr tmp_file_mgr; tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()); - TmpFileMgr::FileGroup file_group(&tmp_file_mgr); + TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_); TUniqueId id; TmpFileMgr::File* allocated_file1; TmpFileMgr::File* allocated_file2; int64_t offset; - ASSERT_OK(file_group.NewFile(0, id, &allocated_file1)); - ASSERT_OK(file_group.NewFile(0, id, &allocated_file2)); - ASSERT_OK(allocated_file1->AllocateSpace(1, &offset)); + ASSERT_OK(NewFile(&file_group, 0, id, &allocated_file1)); + ASSERT_OK(NewFile(&file_group, 0, id, &allocated_file2)); + ASSERT_OK(AllocateSpace(allocated_file1, 1, &offset)); // Make scratch non-writable and test for allocation errors at different stages: // new file creation, files with no allocated blocks. files with allocated space. chmod(scratch_subdir.c_str(), 0); // allocated_file1 already has space allocated. - EXPECT_FALSE(allocated_file1->AllocateSpace(1, &offset).ok()); + EXPECT_FALSE(AllocateSpace(allocated_file1, 1, &offset).ok()); // allocated_file2 has no space allocated. - EXPECT_FALSE(allocated_file2->AllocateSpace(1, &offset).ok()); + EXPECT_FALSE(AllocateSpace(allocated_file2, 1, &offset).ok()); // Creating a new File object can succeed because it is not immediately created on disk. TmpFileMgr::File* unallocated_file; - ASSERT_OK(file_group.NewFile(0, id, &unallocated_file)); + ASSERT_OK(NewFile(&file_group, 0, id, &unallocated_file)); chmod(scratch_subdir.c_str(), S_IRWXU); FileSystemUtil::RemovePaths(tmp_dirs); @@ -242,40 +258,49 @@ TEST_F(TmpFileMgrTest, TestScratchLimit) { const int64_t LIMIT = 100; const int64_t FILE1_ALLOC = 25; const int64_t FILE2_ALLOC = LIMIT - FILE1_ALLOC; - TmpFileMgr::FileGroup file_group(&tmp_file_mgr, LIMIT); + TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_, LIMIT); TmpFileMgr::File* file1; TmpFileMgr::File* file2; TUniqueId id; - ASSERT_OK(file_group.NewFile(0, id, &file1)); - ASSERT_OK(file_group.NewFile(1, id, &file2)); + ASSERT_OK(NewFile(&file_group, 0, id, &file1)); + ASSERT_OK(NewFile(&file_group, 1, id, &file2)); // Test individual limit is enforced. Status status; int64_t offset; - status = file1->AllocateSpace(LIMIT + 1, &offset); - ASSERT_FALSE(status.ok()); + TmpFileMgr::File* alloc_file; + // Alloc from both files should fail. + for (int i = 0; i <= 1; ++i) { + status = file_group.AllocateSpace(LIMIT + 1, &alloc_file, &offset); + ASSERT_FALSE(status.ok()); + ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED); + } + + // Alloc from file 1 should succeed. ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED); - ASSERT_OK(file1->AllocateSpace(FILE1_ALLOC, &offset)); + ASSERT_OK(file_group.AllocateSpace(FILE1_ALLOC, &alloc_file, &offset)); + ASSERT_EQ(alloc_file, file1); // Should select files round-robin. ASSERT_EQ(0, offset); - // Test aggregate limit is enforced. - status = file2->AllocateSpace(FILE2_ALLOC + 1, &offset); - ASSERT_FALSE(status.ok()); - ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED); - ASSERT_OK(file2->AllocateSpace(FILE2_ALLOC, &offset)); + // Test aggregate limit is enforced on both files. + for (int i = 0; i <= 1; ++i) { + status = file_group.AllocateSpace(FILE2_ALLOC + 1, &alloc_file, &offset); + ASSERT_FALSE(status.ok()); + ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED); + } + + // Allocate up to the max. + ASSERT_OK(file_group.AllocateSpace(FILE2_ALLOC, &alloc_file, &offset)); ASSERT_EQ(0, offset); - status = file2->AllocateSpace(1, &offset); + ASSERT_EQ(alloc_file, file2); + + // Test aggregate limit still enforced. + status = file_group.AllocateSpace(1, &alloc_file, &offset); ASSERT_FALSE(status.ok()); ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED); file_group.Close(); } - } -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST); - impala::InitFeSupport(); - return RUN_ALL_TESTS(); -} +IMPALA_TEST_MAIN(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46f5ad48/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index 713cb47..dc35600 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -28,6 +28,7 @@ #include "util/debug-util.h" #include "util/disk-info.h" #include "util/filesystem-util.h" +#include "util/runtime-profile-counters.h" #include "common/names.h" @@ -221,13 +222,8 @@ TmpFileMgr::File::File(TmpFileMgr* mgr, FileGroup* file_group, DeviceId device_i DCHECK(file_group != NULL); } -Status TmpFileMgr::File::AllocateSpace(int64_t write_size, int64_t* offset) { - DCHECK_GT(write_size, 0); - if (file_group_->bytes_limit_ != -1 && - file_group_->current_bytes_allocated_ + write_size - > file_group_->bytes_limit_) { - return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, file_group_->bytes_limit_); - } +Status TmpFileMgr::File::AllocateSpace(int64_t num_bytes, int64_t* offset) { + DCHECK_GT(num_bytes, 0); Status status; if (mgr_->IsBlacklisted(device_id_)) { blacklisted_ = true; @@ -242,14 +238,13 @@ Status TmpFileMgr::File::AllocateSpace(int64_t write_size, int64_t* offset) { } disk_id_ = DiskInfo::disk_id(path_.c_str()); } - int64_t new_size = current_size_ + write_size; + int64_t new_size = current_size_ + num_bytes; status = FileSystemUtil::ResizeFile(path_, new_size); if (!status.ok()) { ReportIOError(status.msg()); return status; } *offset = current_size_; - file_group_->current_bytes_allocated_ += write_size; current_size_ = new_size; return Status::OK(); } @@ -266,11 +261,46 @@ Status TmpFileMgr::File::Remove() { return Status::OK(); } -TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, int64_t bytes_limit) +TmpFileMgr::FileGroup::FileGroup( + TmpFileMgr* tmp_file_mgr, RuntimeProfile* profile, int64_t bytes_limit) : tmp_file_mgr_(tmp_file_mgr), current_bytes_allocated_(0), - bytes_limit_(bytes_limit) { + bytes_limit_(bytes_limit), + next_allocation_index_(0) { DCHECK(tmp_file_mgr != NULL); + scratch_space_bytes_used_counter_ = + ADD_COUNTER(profile, "ScratchFileUsedBytes", TUnit::BYTES); +} + +Status TmpFileMgr::FileGroup::CreateFiles(const TUniqueId& query_id) { + DCHECK(tmp_files_.empty()); + vector<Status> errs; + vector<DeviceId> tmp_devices = tmp_file_mgr_->active_tmp_devices(); + int files_allocated = 0; + // Initialize the tmp files and the initial file to use. + for (int i = 0; i < tmp_devices.size(); ++i) { + TmpFileMgr::DeviceId tmp_device_id = tmp_devices[i]; + // It is possible for a device to be blacklisted after it was returned by + // active_tmp_devices(), handle this gracefully by skipping devices if NewFile() + // fails. + Status status = NewFile(tmp_device_id, query_id); + if (status.ok()) { + ++files_allocated; + } else { + errs.push_back(std::move(status)); + } + } + DCHECK_EQ(tmp_files_.size(), files_allocated); + if (tmp_files_.size() == 0) { + Status err_status("Could not create files in any configured scratch directories " + "(--scratch_dirs)."); + for (Status& err : errs) err_status.MergeStatus(err); + return err_status; + } + + // Start allocating on a random device to avoid overloading the first device. + next_allocation_index_ = rand() % tmp_files_.size(); + return Status::OK(); } Status TmpFileMgr::FileGroup::NewFile(const DeviceId& device_id, @@ -293,4 +323,34 @@ void TmpFileMgr::FileGroup::Close() { tmp_files_.clear(); } -} //namespace impala +Status TmpFileMgr::FileGroup::AllocateSpace( + int64_t num_bytes, File** tmp_file, int64_t* file_offset) { + if (bytes_limit_ != -1 && current_bytes_allocated_ + num_bytes > bytes_limit_) { + return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, bytes_limit_); + } + vector<Status> errs; + // Find the next physical file in round-robin order and allocate a range from it. + for (int attempt = 0; attempt < tmp_files_.size(); ++attempt) { + *tmp_file = tmp_files_[next_allocation_index_].get(); + next_allocation_index_ = (next_allocation_index_ + 1) % tmp_files_.size(); + if ((*tmp_file)->is_blacklisted()) continue; + Status status = (*tmp_file)->AllocateSpace(num_bytes, file_offset); + if (status.ok()) { + scratch_space_bytes_used_counter_->Add(num_bytes); + current_bytes_allocated_ += num_bytes; + return Status::OK(); + } + // Log error and try other files if there was a problem. Problematic files will be + // blacklisted so we will not repeatedly log the same error. + LOG(WARNING) << "Error while allocating range in scratch file '" + << (*tmp_file)->path() << "': " << status.msg().msg() + << ". Will try another scratch file."; + errs.push_back(status); + } + Status err_status("No usable scratch files: space could not be allocated in any " + "of the configured scratch directories (--scratch_dirs)."); + for (Status& err : errs) err_status.MergeStatus(err); + return err_status; +} + +} // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46f5ad48/be/src/runtime/tmp-file-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index 2940cf0..3c489b2 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -19,8 +19,9 @@ #define IMPALA_RUNTIME_TMP_FILE_MGR_H #include "common/status.h" -#include "gen-cpp/Types_types.h" // for TUniqueId +#include "gen-cpp/Types_types.h" // for TUniqueId #include "util/collection-metrics.h" +#include "util/runtime-profile.h" #include "util/spinlock.h" namespace impala { @@ -51,16 +52,6 @@ class TmpFileMgr { /// Creation of the file is deferred until the first call to AllocateSpace(). class File { public: - /// Allocates 'write_size' bytes in this file for a new block of data only if it - /// does not cross the allocation limit of its associated FileGroup. - /// The file size is increased by a call to truncate() if necessary. - /// The physical file is created on the first call to AllocateSpace(). - /// Returns Status::OK() and sets offset on success. - /// Returns an error status if an unexpected error occurs or if allowing the - /// allocation would exceed the allocation limit of its associated FileGroup. - /// If an error status is returned, the caller can try a different temporary file. - Status AllocateSpace(int64_t write_size, int64_t* offset); - /// Called to notify TmpFileMgr that an IO error was encountered for this file void ReportIOError(const ErrorMsg& msg); @@ -71,6 +62,15 @@ class TmpFileMgr { private: friend class FileGroup; friend class TmpFileMgr; + friend class TmpFileMgrTest; + + /// Allocates 'num_bytes' bytes in this file for a new block of data. + /// The file size is increased by a call to truncate() if necessary. + /// The physical file is created on the first call to AllocateSpace(). + /// Returns Status::OK() and sets offset on success. + /// Returns an error status if an unexpected error occurs, e.g. the file could not + /// be created. + Status AllocateSpace(int64_t num_bytes, int64_t* offset); /// Delete the physical file on disk, if one was created. /// It is not valid to read or write to a file after calling Remove(). @@ -112,42 +112,49 @@ class TmpFileMgr { bool blacklisted_; }; - /// Represents a group of files. The total allocated bytes of the group can be bound by - /// setting the space allocation limit. The owner of the FileGroup object is - /// responsible for calling the Close method to delete all the files in the group. + /// Represents a group of temporary files - one per disk with a scratch directory. The + /// total allocated bytes of the group can be bound by setting the space allocation + /// limit. The owner of the FileGroup object is responsible for calling the Close() + /// method to delete all the files in the group. class FileGroup { - public: - FileGroup(TmpFileMgr* tmp_file_mgr, int64_t bytes_limit = -1); + public: + /// Initialize a new file group, which will create files using 'tmp_file_mgr'. + /// Adds counters to 'profile' to track scratch space used. 'bytes_limit' is + /// the limit on the total file space to allocate. + FileGroup( + TmpFileMgr* tmp_file_mgr, RuntimeProfile* profile, int64_t bytes_limit = -1); + + ~FileGroup() { DCHECK_EQ(NumFiles(), 0); } + + /// Initializes the file group with one temporary file per disk with a scratch + /// directory. 'unique_id' is a unique ID that should be used to prefix any + /// scratch file names. It is an error to create multiple FileGroups with the + /// same 'unique_id'. Returns OK if at least one temporary file could be created. + /// Returns an error if no temporary files were successfully created. Must only be + /// called once. + Status CreateFiles(const TUniqueId& unique_id); + + /// Allocate num_bytes bytes in a temporary file. Try multiple disks if error occurs. + /// Returns an error only if no temporary files are usable or the scratch limit is + /// exceeded. + Status AllocateSpace(int64_t num_bytes, File** tmp_file, int64_t* file_offset); - ~FileGroup(){ - DCHECK_EQ(NumFiles(), 0); - } + /// Calls Remove() on all the files in the group and deletes them. + void Close(); + + /// Returns the number of files that are a part of the group. + int NumFiles() { return tmp_files_.size(); } + + private: + friend class TmpFileMgrTest; /// Creates a new File with a unique path for a query instance, adds it to the /// group and returns a handle for that file. The file path is within the (single) /// tmp directory on the specified device id. /// If an error is encountered, e.g. the device is blacklisted, the file is not /// added to this group and a non-ok status is returned. - Status NewFile(const DeviceId& device_id, const TUniqueId& query_id, - File** new_file = NULL); - - /// Returns a file handle at the specified index in the group. - File* GetFileAt(int index) { - DCHECK_GE(index, 0); - DCHECK_LT(index, NumFiles()); - return tmp_files_[index].get(); - } - - /// Calls Remove() on all the files in the group and deletes them. - void Close(); - - /// Returns the number of files that are a part of the group. - int NumFiles() { - return tmp_files_.size(); - } - - private: - friend class File; + Status NewFile( + const DeviceId& device_id, const TUniqueId& unique_id, File** new_file = NULL); /// The TmpFileMgr it is associated with. TmpFileMgr* tmp_file_mgr_; @@ -159,7 +166,15 @@ class TmpFileMgr { int64_t current_bytes_allocated_; /// Max write space allowed (-1 means no limit). - int64_t bytes_limit_; + const int64_t bytes_limit_; + + /// Index into 'tmp_files' denoting the file to which the next temporary file range + /// should be allocated from. Used to implement round-robin allocation from temporary + /// files. + int next_allocation_index_; + + /// Amount of scratch space allocated in bytes. + RuntimeProfile::Counter* scratch_space_bytes_used_counter_; }; TmpFileMgr(); @@ -192,7 +207,7 @@ class TmpFileMgr { /// responsible for deleting it. The file is not created - creation is deferred until /// the first call to File::AllocateSpace(). Status NewFile(FileGroup* file_group, const DeviceId& device_id, - const TUniqueId& query_id, std::unique_ptr<File>* new_file); + const TUniqueId& unique_id, std::unique_ptr<File>* new_file); /// Dir stores information about a temporary directory. class Dir {
