Repository: incubator-impala Updated Branches: refs/heads/master e98d2f1c0 -> 87fc463e0
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/disk-io-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc index 8d36ea6..eb8d3c7 100644 --- a/be/src/runtime/disk-io-mgr-test.cc +++ b/be/src/runtime/disk-io-mgr-test.cc @@ -20,13 +20,14 @@ #include <boost/thread/thread.hpp> #include <sys/stat.h> -#include "testutil/gtest-util.h" #include "codegen/llvm-codegen.h" #include "common/init.h" -#include "runtime/disk-io-mgr.h" +#include "runtime/disk-io-mgr-reader-context.h" #include "runtime/disk-io-mgr-stress.h" +#include "runtime/disk-io-mgr.h" #include "runtime/mem-tracker.h" #include "runtime/thread-resource-mgr.h" +#include "testutil/gtest-util.h" #include "util/condition-variable.h" #include "util/cpu-info.h" #include "util/disk-info.h" @@ -62,7 +63,7 @@ class DiskIoMgrTest : public testing::Test { } if (status.ok()) { DiskIoMgr::ScanRange* scan_range = pool_.Add(new DiskIoMgr::ScanRange()); - scan_range->Reset(NULL, (*written_range)->file(), (*written_range)->len(), + scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(), (*written_range)->offset(), 0, false, DiskIoMgr::BufferOpts::Uncached()); ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast<const char*>(data), sizeof(int32_t)); @@ -87,14 +88,14 @@ class DiskIoMgrTest : public testing::Test { protected: void CreateTempFile(const char* filename, const char* data) { FILE* file = fopen(filename, "w"); - EXPECT_TRUE(file != NULL); + EXPECT_TRUE(file != nullptr); fwrite(data, 1, strlen(data), file); fclose(file); } int CreateTempFile(const char* filename, int file_size) { FILE* file = fopen(filename, "w"); - EXPECT_TRUE(file != NULL); + EXPECT_TRUE(file != nullptr); int success = fclose(file); if (success != 0) { LOG(ERROR) << "Error closing file " << filename; @@ -116,7 +117,7 @@ class DiskIoMgrTest : public testing::Test { DiskIoMgr::ScanRange* range, const char* expected, int expected_len = -1) { unique_ptr<DiskIoMgr::BufferDescriptor> buffer; ASSERT_OK(io_mgr->Read(reader, range, &buffer)); - ASSERT_TRUE(buffer != NULL); + ASSERT_TRUE(buffer != nullptr); EXPECT_EQ(buffer->len(), range->len()); if (expected_len < 0) expected_len = strlen(expected); int cmp = memcmp(buffer->buffer(), expected, expected_len); @@ -133,8 +134,8 @@ class DiskIoMgrTest : public testing::Test { unique_ptr<DiskIoMgr::BufferDescriptor> buffer; Status status = range->GetNext(&buffer); ASSERT_TRUE(status.ok() || status.code() == expected_status.code()); - if (buffer == NULL || !status.ok()) { - if (buffer != NULL) io_mgr->ReturnBuffer(move(buffer)); + if (buffer == nullptr || !status.ok()) { + if (buffer != nullptr) io_mgr->ReturnBuffer(move(buffer)); break; } ASSERT_LE(buffer->len(), expected_len); @@ -155,7 +156,7 @@ class DiskIoMgrTest : public testing::Test { DiskIoMgr::ScanRange* range; Status status = io_mgr->GetNextRange(reader, &range); ASSERT_TRUE(status.ok() || status.code() == expected_status.code()); - if (range == NULL) break; + if (range == nullptr) break; ValidateScanRange(io_mgr, range, expected_result, expected_len, expected_status); num_ranges_processed->Add(1); ++num_ranges; @@ -167,9 +168,9 @@ class DiskIoMgrTest : public testing::Test { } DiskIoMgr::ScanRange* InitRange(const char* file_path, int offset, int len, - int disk_id, int64_t mtime, void* meta_data = NULL, bool is_cached = false) { + int disk_id, int64_t mtime, void* meta_data = nullptr, bool is_cached = false) { DiskIoMgr::ScanRange* range = AllocateRange(); - range->Reset(NULL, file_path, len, offset, disk_id, true, + range->Reset(nullptr, file_path, len, offset, disk_id, true, DiskIoMgr::BufferOpts(is_cached, mtime), meta_data); EXPECT_EQ(mtime, range->mtime()); return range; @@ -203,26 +204,25 @@ TEST_F(DiskIoMgrTest, SingleWriter) { scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10)); MemTracker reader_mem_tracker(LARGE_MEM_LIMIT); ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker)); - DiskIoRequestContext* reader; - read_io_mgr->RegisterContext(&reader, &reader_mem_tracker); + unique_ptr<DiskIoRequestContext> reader = + read_io_mgr->RegisterContext(&reader_mem_tracker); for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { pool_.Clear(); // Destroy scan ranges from previous iterations. DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10); ASSERT_OK(io_mgr.Init(&mem_tracker)); - DiskIoRequestContext* writer; - io_mgr.RegisterContext(&writer, &mem_tracker); + unique_ptr<DiskIoRequestContext> writer = io_mgr.RegisterContext(&mem_tracker); for (int i = 0; i < num_ranges; ++i) { int32_t* data = pool_.Add(new int32_t); *data = rand(); DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*); DiskIoMgr::WriteRange::WriteDoneCallback callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges, - new_range, read_io_mgr.get(), reader, data, Status::OK(), _1); - *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset, - num_ranges % num_disks, callback)); + new_range, read_io_mgr.get(), reader.get(), data, Status::OK(), _1); + *new_range = pool_.Add(new DiskIoMgr::WriteRange( + tmp_file, cur_offset, num_ranges % num_disks, callback)); (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t)); - EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range)); + EXPECT_OK(io_mgr.AddWriteRange(writer.get(), *new_range)); cur_offset += sizeof(int32_t); } @@ -231,11 +231,11 @@ TEST_F(DiskIoMgrTest, SingleWriter) { while (num_ranges_written_ < num_ranges) writes_done_.Wait(lock); } num_ranges_written_ = 0; - io_mgr.UnregisterContext(writer); + io_mgr.UnregisterContext(writer.get()); } } - read_io_mgr->UnregisterContext(reader); + read_io_mgr->UnregisterContext(reader.get()); read_io_mgr.reset(); } @@ -247,8 +247,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) { string tmp_file = "/non-existent/file.txt"; DiskIoMgr io_mgr(1, 1, 1, 1, 10); ASSERT_OK(io_mgr.Init(&mem_tracker)); - DiskIoRequestContext* writer; - io_mgr.RegisterContext(&writer, NULL); + unique_ptr<DiskIoRequestContext> writer = io_mgr.RegisterContext(nullptr); int32_t* data = pool_.Add(new int32_t); *data = rand(); @@ -256,12 +255,12 @@ TEST_F(DiskIoMgrTest, InvalidWrite) { DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*); DiskIoMgr::WriteRange::WriteDoneCallback callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range, - (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, data, + (DiskIoMgr*)nullptr, (DiskIoRequestContext*)nullptr, data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1); *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback)); (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t)); - EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range)); + EXPECT_OK(io_mgr.AddWriteRange(writer.get(), *new_range)); // Write to a bad location in a file that exists. tmp_file = "/tmp/disk_io_mgr_test.txt"; @@ -273,19 +272,19 @@ TEST_F(DiskIoMgrTest, InvalidWrite) { new_range = pool_.Add(new DiskIoMgr::WriteRange*); callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, - new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, + new_range, (DiskIoMgr*)nullptr, (DiskIoRequestContext*)nullptr, data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1); *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback)); (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t)); - EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range)); + EXPECT_OK(io_mgr.AddWriteRange(writer.get(), *new_range)); { unique_lock<mutex> lock(written_mutex_); while (num_ranges_written_ < 2) writes_done_.Wait(lock); } num_ranges_written_ = 0; - io_mgr.UnregisterContext(writer); + io_mgr.UnregisterContext(writer.get()); } // Issue a number of writes, cancel the writer context and issue more writes. @@ -309,33 +308,31 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) { scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10)); MemTracker reader_mem_tracker(LARGE_MEM_LIMIT); ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker)); - DiskIoRequestContext* reader; - read_io_mgr->RegisterContext(&reader, &reader_mem_tracker); + unique_ptr<DiskIoRequestContext> reader = + read_io_mgr->RegisterContext(&reader_mem_tracker); for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { pool_.Clear(); // Destroy scan ranges from previous iterations. DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10); ASSERT_OK(io_mgr.Init(&mem_tracker)); - DiskIoRequestContext* writer; - io_mgr.RegisterContext(&writer, &mem_tracker); + unique_ptr<DiskIoRequestContext> writer = io_mgr.RegisterContext(&mem_tracker); Status validate_status = Status::OK(); for (int i = 0; i < num_ranges; ++i) { if (i == num_ranges_before_cancel) { - io_mgr.CancelContext(writer); + io_mgr.CancelContext(writer.get()); validate_status = Status::CANCELLED; } int32_t* data = pool_.Add(new int32_t); *data = rand(); DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*); - DiskIoMgr::WriteRange::WriteDoneCallback callback = - bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, - num_ranges_before_cancel, new_range, read_io_mgr.get(), reader, data, - Status::CANCELLED, _1); - *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset, - num_ranges % num_disks, callback)); + DiskIoMgr::WriteRange::WriteDoneCallback callback = bind( + mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges_before_cancel, + new_range, read_io_mgr.get(), reader.get(), data, Status::CANCELLED, _1); + *new_range = pool_.Add(new DiskIoMgr::WriteRange( + tmp_file, cur_offset, num_ranges % num_disks, callback)); (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t)); cur_offset += sizeof(int32_t); - Status add_status = io_mgr.AddWriteRange(writer, *new_range); + Status add_status = io_mgr.AddWriteRange(writer.get(), *new_range); EXPECT_TRUE(add_status.code() == validate_status.code()); } @@ -344,11 +341,11 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) { while (num_ranges_written_ < num_ranges_before_cancel) writes_done_.Wait(lock); } num_ranges_written_ = 0; - io_mgr.UnregisterContext(writer); + io_mgr.UnregisterContext(writer.get()); } } - read_io_mgr->UnregisterContext(reader); + read_io_mgr->UnregisterContext(reader.get()); read_io_mgr.reset(); } @@ -379,27 +376,26 @@ TEST_F(DiskIoMgrTest, SingleReader) { ASSERT_OK(io_mgr.Init(&mem_tracker)); MemTracker reader_mem_tracker; - DiskIoRequestContext* reader; - io_mgr.RegisterContext(&reader, &reader_mem_tracker); + unique_ptr<DiskIoRequestContext> reader = + io_mgr.RegisterContext(&reader_mem_tracker); vector<DiskIoMgr::ScanRange*> ranges; for (int i = 0; i < len; ++i) { int disk_id = i % num_disks; - ranges.push_back( - InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime)); + ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime)); } - ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); + ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges)); AtomicInt32 num_ranges_processed; thread_group threads; for (int i = 0; i < num_read_threads; ++i) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, - len, Status::OK(), 0, &num_ranges_processed)); + threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, len, + Status::OK(), 0, &num_ranges_processed)); } threads.join_all(); EXPECT_EQ(num_ranges_processed.Load(), ranges.size()); - io_mgr.UnregisterContext(reader); + io_mgr.UnregisterContext(reader.get()); EXPECT_EQ(reader_mem_tracker.consumption(), 0); } } @@ -431,8 +427,8 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) { ASSERT_OK(io_mgr.Init(&mem_tracker)); MemTracker reader_mem_tracker; - DiskIoRequestContext* reader; - io_mgr.RegisterContext(&reader, &reader_mem_tracker); + unique_ptr<DiskIoRequestContext> reader = + io_mgr.RegisterContext(&reader_mem_tracker); vector<DiskIoMgr::ScanRange*> ranges_first_half; vector<DiskIoMgr::ScanRange*> ranges_second_half; @@ -449,25 +445,25 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) { AtomicInt32 num_ranges_processed; // Issue first half the scan ranges. - ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_first_half)); + ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges_first_half)); // Read a couple of them - ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 2, + ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::OK(), 2, &num_ranges_processed); // Issue second half - ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_second_half)); + ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges_second_half)); // Start up some threads and then cancel thread_group threads; for (int i = 0; i < 3; ++i) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, + threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, strlen(data), Status::CANCELLED, 0, &num_ranges_processed)); } threads.join_all(); EXPECT_EQ(num_ranges_processed.Load(), len); - io_mgr.UnregisterContext(reader); + io_mgr.UnregisterContext(reader.get()); EXPECT_EQ(reader_mem_tracker.consumption(), 0); } } @@ -501,44 +497,43 @@ TEST_F(DiskIoMgrTest, SyncReadTest) { ASSERT_OK(io_mgr.Init(&mem_tracker)); MemTracker reader_mem_tracker; - DiskIoRequestContext* reader; - io_mgr.RegisterContext(&reader, &reader_mem_tracker); + unique_ptr<DiskIoRequestContext> reader = + io_mgr.RegisterContext(&reader_mem_tracker); - DiskIoMgr::ScanRange* complete_range = InitRange(tmp_file, 0, strlen(data), 0, - stat_val.st_mtime); + DiskIoMgr::ScanRange* complete_range = + InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime); // Issue some reads before the async ones are issued - ValidateSyncRead(&io_mgr, reader, complete_range, data); - ValidateSyncRead(&io_mgr, reader, complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); vector<DiskIoMgr::ScanRange*> ranges; for (int i = 0; i < len; ++i) { int disk_id = i % num_disks; - ranges.push_back(InitRange(tmp_file, 0, len, disk_id, - stat_val.st_mtime)); + ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime)); } - ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); + ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges)); AtomicInt32 num_ranges_processed; thread_group threads; for (int i = 0; i < 5; ++i) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, + threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, strlen(data), Status::OK(), 0, &num_ranges_processed)); } // Issue some more sync ranges for (int i = 0; i < 5; ++i) { sched_yield(); - ValidateSyncRead(&io_mgr, reader, complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); } threads.join_all(); - ValidateSyncRead(&io_mgr, reader, complete_range, data); - ValidateSyncRead(&io_mgr, reader, complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); EXPECT_EQ(num_ranges_processed.Load(), ranges.size()); - io_mgr.UnregisterContext(reader); + io_mgr.UnregisterContext(reader.get()); EXPECT_EQ(reader_mem_tracker.consumption(), 0); } } @@ -569,21 +564,21 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) { ASSERT_OK(io_mgr.Init(&mem_tracker)); MemTracker reader_mem_tracker; - DiskIoRequestContext* reader; - io_mgr.RegisterContext(&reader, &reader_mem_tracker); + unique_ptr<DiskIoRequestContext> reader = + io_mgr.RegisterContext(&reader_mem_tracker); vector<DiskIoMgr::ScanRange*> ranges; for (int i = 0; i < len; ++i) { int disk_id = i % num_disks; ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime)); } - ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); + ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges)); AtomicInt32 num_ranges_processed; int num_succesful_ranges = ranges.size() / 2; // Read half the ranges for (int i = 0; i < num_succesful_ranges; ++i) { - ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 1, + ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::OK(), 1, &num_ranges_processed); } EXPECT_EQ(num_ranges_processed.Load(), num_succesful_ranges); @@ -591,16 +586,16 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) { // Start up some threads and then cancel thread_group threads; for (int i = 0; i < 3; ++i) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, + threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, strlen(data), Status::CANCELLED, 0, &num_ranges_processed)); } - io_mgr.CancelContext(reader); + io_mgr.CancelContext(reader.get()); sched_yield(); threads.join_all(); - EXPECT_TRUE(io_mgr.context_status(reader).IsCancelled()); - io_mgr.UnregisterContext(reader); + EXPECT_TRUE(io_mgr.context_status(reader.get()).IsCancelled()); + io_mgr.UnregisterContext(reader.get()); EXPECT_EQ(reader_mem_tracker.consumption(), 0); } } @@ -627,20 +622,19 @@ TEST_F(DiskIoMgrTest, MemLimits) { ASSERT_OK(io_mgr.Init(&root_mem_tracker)); MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker); - DiskIoRequestContext* reader; - io_mgr.RegisterContext(&reader, &reader_mem_tracker); + unique_ptr<DiskIoRequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker); vector<DiskIoMgr::ScanRange*> ranges; for (int i = 0; i < num_ranges; ++i) { ranges.push_back(InitRange(tmp_file, 0, len, 0, stat_val.st_mtime)); } - ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); + ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges)); // Don't return buffers to force memory pressure vector<unique_ptr<DiskIoMgr::BufferDescriptor>> buffers; AtomicInt32 num_ranges_processed; - ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::MemLimitExceeded(), + ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::MemLimitExceeded(), 1, &num_ranges_processed); char result[strlen(data) + 1]; @@ -648,16 +642,16 @@ TEST_F(DiskIoMgrTest, MemLimits) { // to go over the limit eventually. while (true) { memset(result, 0, strlen(data) + 1); - DiskIoMgr::ScanRange* range = NULL; - Status status = io_mgr.GetNextRange(reader, &range); + DiskIoMgr::ScanRange* range = nullptr; + Status status = io_mgr.GetNextRange(reader.get(), &range); ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded()); - if (range == NULL) break; + if (range == nullptr) break; while (true) { unique_ptr<DiskIoMgr::BufferDescriptor> buffer; Status status = range->GetNext(&buffer); ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded()); - if (buffer == NULL) break; + if (buffer == nullptr) break; memcpy(result + range->offset() + buffer->scan_range_offset(), buffer->buffer(), buffer->len()); buffers.push_back(move(buffer)); @@ -669,8 +663,8 @@ TEST_F(DiskIoMgrTest, MemLimits) { io_mgr.ReturnBuffer(move(buffers[i])); } - EXPECT_TRUE(io_mgr.context_status(reader).IsMemLimitExceeded()); - io_mgr.UnregisterContext(reader); + EXPECT_TRUE(io_mgr.context_status(reader.get()).IsMemLimitExceeded()); + io_mgr.UnregisterContext(reader.get()); EXPECT_EQ(reader_mem_tracker.consumption(), 0); } } @@ -696,44 +690,43 @@ TEST_F(DiskIoMgrTest, CachedReads) { ASSERT_OK(io_mgr.Init(&mem_tracker)); MemTracker reader_mem_tracker; - DiskIoRequestContext* reader; - io_mgr.RegisterContext(&reader, &reader_mem_tracker); + unique_ptr<DiskIoRequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker); DiskIoMgr::ScanRange* complete_range = - InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime, NULL, true); + InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime, nullptr, true); // Issue some reads before the async ones are issued - ValidateSyncRead(&io_mgr, reader, complete_range, data); - ValidateSyncRead(&io_mgr, reader, complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); vector<DiskIoMgr::ScanRange*> ranges; for (int i = 0; i < len; ++i) { int disk_id = i % num_disks; ranges.push_back( - InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime, NULL, true)); + InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime, nullptr, true)); } - ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); + ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges)); AtomicInt32 num_ranges_processed; thread_group threads; for (int i = 0; i < 5; ++i) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data, + threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader.get(), data, strlen(data), Status::OK(), 0, &num_ranges_processed)); } // Issue some more sync ranges for (int i = 0; i < 5; ++i) { sched_yield(); - ValidateSyncRead(&io_mgr, reader, complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); } threads.join_all(); - ValidateSyncRead(&io_mgr, reader, complete_range, data); - ValidateSyncRead(&io_mgr, reader, complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); + ValidateSyncRead(&io_mgr, reader.get(), complete_range, data); EXPECT_EQ(num_ranges_processed.Load(), ranges.size()); - io_mgr.UnregisterContext(reader); + io_mgr.UnregisterContext(reader.get()); EXPECT_EQ(reader_mem_tracker.consumption(), 0); } EXPECT_EQ(mem_tracker.consumption(), 0); @@ -761,7 +754,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { stat(file_name.c_str(), &stat_val); int64_t iters = 0; - vector<DiskIoRequestContext*> contexts(num_contexts); + vector<unique_ptr<DiskIoRequestContext>> contexts(num_contexts); Status status; for (int iteration = 0; iteration < ITERATIONS; ++iteration) { for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) { @@ -770,7 +763,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { MAX_BUFFER_SIZE); ASSERT_OK(io_mgr.Init(&mem_tracker)); for (int file_index = 0; file_index < num_contexts; ++file_index) { - io_mgr.RegisterContext(&contexts[file_index], &mem_tracker); + contexts[file_index] = io_mgr.RegisterContext(&mem_tracker); } pool_.Clear(); int read_offset = 0; @@ -783,12 +776,12 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { vector<DiskIoMgr::ScanRange*> ranges; int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset); for (int i = 0; i < num_scan_ranges; ++i) { - ranges.push_back(InitRange(file_name.c_str(), read_offset, 1, - i % num_disks, stat_val.st_mtime)); - threads.add_thread(new thread(ScanRangeThread, &io_mgr, - contexts[context_index], - reinterpret_cast<const char*>(data + (read_offset % strlen(data))), 1, - Status::OK(), num_scan_ranges, &num_ranges_processed)); + ranges.push_back(InitRange( + file_name.c_str(), read_offset, 1, i % num_disks, stat_val.st_mtime)); + threads.add_thread( + new thread(ScanRangeThread, &io_mgr, contexts[context_index].get(), + reinterpret_cast<const char*>(data + (read_offset % strlen(data))), + 1, Status::OK(), num_scan_ranges, &num_ranges_processed)); ++read_offset; } @@ -798,12 +791,12 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { DiskIoMgr::WriteRange::WriteDoneCallback callback = bind(mem_fn(&DiskIoMgrTest::WriteCompleteCallback), this, num_write_ranges, _1); - DiskIoMgr::WriteRange* new_range = pool_.Add( - new DiskIoMgr::WriteRange(file_name, - write_offset, i % num_disks, callback)); - new_range->SetData(reinterpret_cast<const uint8_t*> - (data + (write_offset % strlen(data))), 1); - status = io_mgr.AddWriteRange(contexts[context_index], new_range); + DiskIoMgr::WriteRange* new_range = pool_.Add(new DiskIoMgr::WriteRange( + file_name, write_offset, i % num_disks, callback)); + new_range->SetData( + reinterpret_cast<const uint8_t*>(data + (write_offset % strlen(data))), + 1); + status = io_mgr.AddWriteRange(contexts[context_index].get(), new_range); ++write_offset; } @@ -816,9 +809,8 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { } // for (int context_index } // while (read_offset < file_size) - for (int file_index = 0; file_index < num_contexts; ++file_index) { - io_mgr.UnregisterContext(contexts[file_index]); + io_mgr.UnregisterContext(contexts[file_index].get()); } } // for (int num_disks } // for (int threads_per_disk @@ -836,7 +828,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) { vector<string> file_names; vector<int64_t> mtimes; vector<string> data; - vector<DiskIoRequestContext*> readers; + vector<unique_ptr<DiskIoRequestContext>> readers; vector<char*> results; file_names.resize(NUM_READERS); @@ -884,30 +876,28 @@ TEST_F(DiskIoMgrTest, MultipleReader) { EXPECT_OK(io_mgr.Init(&mem_tracker)); for (int i = 0; i < NUM_READERS; ++i) { - io_mgr.RegisterContext(&readers[i], &mem_tracker); + readers[i] = io_mgr.RegisterContext(&mem_tracker); vector<DiskIoMgr::ScanRange*> ranges; for (int j = 0; j < DATA_LEN; ++j) { int disk_id = j % num_disks; - ranges.push_back( - InitRange(file_names[i].c_str(), j, 1, disk_id, mtimes[i])); + ranges.push_back(InitRange(file_names[i].c_str(), j, 1, disk_id, mtimes[i])); } - ASSERT_OK(io_mgr.AddScanRanges(readers[i], ranges)); + ASSERT_OK(io_mgr.AddScanRanges(readers[i].get(), ranges)); } AtomicInt32 num_ranges_processed; thread_group threads; for (int i = 0; i < NUM_READERS; ++i) { for (int j = 0; j < NUM_THREADS_PER_READER; ++j) { - threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i], - data[i].c_str(), data[i].size(), Status::OK(), 0, - &num_ranges_processed)); + threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i].get(), + data[i].c_str(), data[i].size(), Status::OK(), 0, &num_ranges_processed)); } } threads.join_all(); EXPECT_EQ(num_ranges_processed.Load(), DATA_LEN * NUM_READERS); for (int i = 0; i < NUM_READERS; ++i) { - io_mgr.UnregisterContext(readers[i]); + io_mgr.UnregisterContext(readers[i].get()); } } } @@ -935,16 +925,16 @@ TEST_F(DiskIoMgrTest, Buffers) { ASSERT_EQ(root_mem_tracker.consumption(), 0); MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker); - DiskIoRequestContext* reader; - io_mgr.RegisterContext(&reader, &reader_mem_tracker); + unique_ptr<DiskIoRequestContext> reader; + reader = io_mgr.RegisterContext(&reader_mem_tracker); DiskIoMgr::ScanRange* dummy_range = InitRange("dummy", 0, 0, 0, 0); // buffer length should be rounded up to min buffer size int64_t buffer_len = 1; unique_ptr<DiskIoMgr::BufferDescriptor> buffer_desc; - buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len); - EXPECT_TRUE(buffer_desc->buffer() != NULL); + buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len); + EXPECT_TRUE(buffer_desc->buffer() != nullptr); EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len()); EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load()); io_mgr.FreeBufferMemory(buffer_desc.get()); @@ -953,8 +943,8 @@ TEST_F(DiskIoMgrTest, Buffers) { // reuse buffer buffer_len = min_buffer_size; - buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len); - EXPECT_TRUE(buffer_desc->buffer() != NULL); + buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len); + EXPECT_TRUE(buffer_desc->buffer() != nullptr); EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len()); EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load()); io_mgr.FreeBufferMemory(buffer_desc.get()); @@ -963,8 +953,8 @@ TEST_F(DiskIoMgrTest, Buffers) { // bump up to next buffer size buffer_len = min_buffer_size + 1; - buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len); - EXPECT_TRUE(buffer_desc->buffer() != NULL); + buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len); + EXPECT_TRUE(buffer_desc->buffer() != nullptr); EXPECT_EQ(min_buffer_size * 2, buffer_desc->buffer_len()); EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load()); EXPECT_EQ(min_buffer_size * 3, root_mem_tracker.consumption()); @@ -979,8 +969,8 @@ TEST_F(DiskIoMgrTest, Buffers) { // max buffer size buffer_len = max_buffer_size; - buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len); - EXPECT_TRUE(buffer_desc->buffer() != NULL); + buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len); + EXPECT_TRUE(buffer_desc->buffer() != nullptr); EXPECT_EQ(max_buffer_size, buffer_desc->buffer_len()); EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load()); io_mgr.FreeBufferMemory(buffer_desc.get()); @@ -991,7 +981,7 @@ TEST_F(DiskIoMgrTest, Buffers) { io_mgr.GcIoBuffers(); EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 0); EXPECT_EQ(root_mem_tracker.consumption(), 0); - io_mgr.UnregisterContext(reader); + io_mgr.UnregisterContext(reader.get()); } // IMPALA-2366: handle partial read where range goes past end of file. @@ -1011,19 +1001,19 @@ TEST_F(DiskIoMgrTest, PartialRead) { ASSERT_OK(io_mgr->Init(&mem_tracker)); MemTracker reader_mem_tracker; - DiskIoRequestContext* reader; - io_mgr->RegisterContext(&reader, &reader_mem_tracker); + unique_ptr<DiskIoRequestContext> reader; + reader = io_mgr->RegisterContext(&reader_mem_tracker); // We should not read past the end of file. DiskIoMgr::ScanRange* range = InitRange(tmp_file, 0, read_len, 0, stat_val.st_mtime); unique_ptr<DiskIoMgr::BufferDescriptor> buffer; - ASSERT_OK(io_mgr->Read(reader, range, &buffer)); + ASSERT_OK(io_mgr->Read(reader.get(), range, &buffer)); ASSERT_TRUE(buffer->eosr()); ASSERT_EQ(len, buffer->len()); ASSERT_TRUE(memcmp(buffer->buffer(), data, len) == 0); io_mgr->ReturnBuffer(move(buffer)); - io_mgr->UnregisterContext(reader); + io_mgr->UnregisterContext(reader.get()); pool_.Clear(); io_mgr.reset(); EXPECT_EQ(reader_mem_tracker.consumption(), 0); @@ -1043,17 +1033,17 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) { ASSERT_OK(io_mgr->Init(&mem_tracker)); // Reader doesn't need to provide mem tracker if it's providing buffers. - MemTracker* reader_mem_tracker = NULL; - DiskIoRequestContext* reader; - io_mgr->RegisterContext(&reader, reader_mem_tracker); + MemTracker* reader_mem_tracker = nullptr; + unique_ptr<DiskIoRequestContext> reader; + reader = io_mgr->RegisterContext(reader_mem_tracker); for (int buffer_len : vector<int>({len - 1, len, len + 1})) { vector<uint8_t> client_buffer(buffer_len); int scan_len = min(len, buffer_len); DiskIoMgr::ScanRange* range = AllocateRange(); - range->Reset(NULL, tmp_file, scan_len, 0, 0, true, + range->Reset(nullptr, tmp_file, scan_len, 0, 0, true, DiskIoMgr::BufferOpts::ReadInto(client_buffer.data(), buffer_len)); - ASSERT_OK(io_mgr->AddScanRange(reader, range, true)); + ASSERT_OK(io_mgr->AddScanRange(reader.get(), range, true)); unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer; ASSERT_OK(range->GetNext(&io_buffer)); @@ -1067,7 +1057,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) { io_mgr->ReturnBuffer(move(io_buffer)); } - io_mgr->UnregisterContext(reader); + io_mgr->UnregisterContext(reader.get()); pool_.Clear(); io_mgr.reset(); EXPECT_EQ(mem_tracker.consumption(), 0); @@ -1083,19 +1073,19 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) { ASSERT_OK(io_mgr->Init(&mem_tracker)); // Reader doesn't need to provide mem tracker if it's providing buffers. - MemTracker* reader_mem_tracker = NULL; - DiskIoRequestContext* reader; + MemTracker* reader_mem_tracker = nullptr; + unique_ptr<DiskIoRequestContext> reader; vector<uint8_t> client_buffer(SCAN_LEN); for (int i = 0; i < 1000; ++i) { - io_mgr->RegisterContext(&reader, reader_mem_tracker); + reader = io_mgr->RegisterContext(reader_mem_tracker); DiskIoMgr::ScanRange* range = AllocateRange(); - range->Reset(NULL, tmp_file, SCAN_LEN, 0, 0, true, + range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true, DiskIoMgr::BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN)); - ASSERT_OK(io_mgr->AddScanRange(reader, range, true)); + ASSERT_OK(io_mgr->AddScanRange(reader.get(), range, true)); /// Also test the cancellation path. Run multiple iterations since it is racy whether /// the read fails before the cancellation. - if (i >= 1) io_mgr->CancelContext(reader); + if (i >= 1) io_mgr->CancelContext(reader.get()); unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer; ASSERT_FALSE(range->GetNext(&io_buffer).ok()); @@ -1103,7 +1093,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) { // DiskIoMgr should not have allocated memory. EXPECT_EQ(mem_tracker.consumption(), 0); - io_mgr->UnregisterContext(reader); + io_mgr->UnregisterContext(reader.get()); } pool_.Clear(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 8af70f5..d614ac7 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -123,90 +123,8 @@ static inline bool is_file_handle_caching_enabled() { } } -// This class provides a cache of DiskIoRequestContext objects. DiskIoRequestContexts -// are recycled. This is good for locality as well as lock contention. The cache has -// the property that regardless of how many clients get added/removed, the memory -// locations for existing clients do not change (not the case with std::vector) -// minimizing the locks we have to take across all readers. -// All functions on this object are thread safe -class DiskIoMgr::RequestContextCache { - public: - RequestContextCache(DiskIoMgr* io_mgr) : io_mgr_(io_mgr) {} - - // Returns a context to the cache. This object can now be reused. - void ReturnContext(DiskIoRequestContext* reader) { - DCHECK(reader->state_ != DiskIoRequestContext::Inactive); - reader->state_ = DiskIoRequestContext::Inactive; - lock_guard<mutex> l(lock_); - inactive_contexts_.push_back(reader); - } - - // Returns a new DiskIoRequestContext object. Allocates a new object if necessary. - DiskIoRequestContext* GetNewContext() { - lock_guard<mutex> l(lock_); - if (!inactive_contexts_.empty()) { - DiskIoRequestContext* reader = inactive_contexts_.front(); - inactive_contexts_.pop_front(); - return reader; - } else { - DiskIoRequestContext* reader = - new DiskIoRequestContext(io_mgr_, io_mgr_->num_total_disks()); - all_contexts_.push_back(reader); - return reader; - } - } - - // This object has the same lifetime as the disk IoMgr. - ~RequestContextCache() { - for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin(); - it != all_contexts_.end(); ++it) { - delete *it; - } - } - - // Validates that all readers are cleaned up and in the inactive state. No locks - // are taken since this is only called from the disk IoMgr destructor. - bool ValidateAllInactive() { - for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin(); - it != all_contexts_.end(); ++it) { - if ((*it)->state_ != DiskIoRequestContext::Inactive) { - return false; - } - } - DCHECK_EQ(all_contexts_.size(), inactive_contexts_.size()); - return all_contexts_.size() == inactive_contexts_.size(); - } - - string DebugString(); - - private: - DiskIoMgr* io_mgr_; - - // lock to protect all members below - mutex lock_; - - // List of all request contexts created. Used for debugging - list<DiskIoRequestContext*> all_contexts_; - - // List of inactive readers. These objects can be used for a new reader. - list<DiskIoRequestContext*> inactive_contexts_; -}; - -string DiskIoMgr::RequestContextCache::DebugString() { - lock_guard<mutex> l(lock_); - stringstream ss; - for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin(); - it != all_contexts_.end(); ++it) { - unique_lock<mutex> lock((*it)->lock_); - ss << (*it)->DebugString() << endl; - } - return ss.str(); -} - string DiskIoMgr::DebugString() { stringstream ss; - ss << "RequestContexts: " << endl << request_context_cache_->DebugString() << endl; - ss << "Disks: " << endl; for (int i = 0; i < disk_queues_.size(); ++i) { unique_lock<mutex> lock(disk_queues_[i]->lock); @@ -358,9 +276,6 @@ DiskIoMgr::~DiskIoMgr() { } } - DCHECK(request_context_cache_.get() == nullptr || - request_context_cache_->ValidateAllInactive()) - << endl << DebugString(); DCHECK_EQ(num_buffers_in_readers_.Load(), 0); // Delete all allocated buffers @@ -407,7 +322,6 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) { disk_thread_group_.AddThread(move(t)); } } - request_context_cache_.reset(new RequestContextCache(this)); RETURN_IF_ERROR(file_handle_cache_.Init()); cached_read_options_ = hadoopRzOptionsAlloc(); @@ -422,24 +336,13 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) { return Status::OK(); } -void DiskIoMgr::RegisterContext(DiskIoRequestContext** request_context, - MemTracker* mem_tracker) { - DCHECK(request_context_cache_.get() != nullptr) << "Must call Init() first."; - *request_context = request_context_cache_->GetNewContext(); - (*request_context)->Reset(mem_tracker); +unique_ptr<DiskIoRequestContext> DiskIoMgr::RegisterContext(MemTracker* mem_tracker) { + return unique_ptr<DiskIoRequestContext>( + new DiskIoRequestContext(this, num_total_disks(), mem_tracker)); } void DiskIoMgr::UnregisterContext(DiskIoRequestContext* reader) { - // Blocking cancel (waiting for disks completion). - CancelContext(reader, true); - - // All the disks are done with clean, validate nothing is leaking. - unique_lock<mutex> reader_lock(reader->lock_); - DCHECK_EQ(reader->num_buffers_in_reader_.Load(), 0) << endl << reader->DebugString(); - DCHECK_EQ(reader->num_used_buffers_.Load(), 0) << endl << reader->DebugString(); - - DCHECK(reader->Validate()) << endl << reader->DebugString(); - request_context_cache_->ReturnContext(reader); + reader->CancelAndMarkInactive(); } // Cancellation requires coordination from multiple threads. Each thread that currently @@ -461,17 +364,8 @@ void DiskIoMgr::UnregisterContext(DiskIoRequestContext* reader) { // state, removes the context from the disk queue. The last thread per disk with an // outstanding reference to the context decrements the number of disk queues the context // is on. -// If wait_for_disks_completion is true, wait for the number of active disks to become 0. -void DiskIoMgr::CancelContext(DiskIoRequestContext* context, bool wait_for_disks_completion) { +void DiskIoMgr::CancelContext(DiskIoRequestContext* context) { context->Cancel(Status::CANCELLED); - - if (wait_for_disks_completion) { - unique_lock<mutex> lock(context->lock_); - DCHECK(context->Validate()) << endl << context->DebugString(); - while (context->num_disks_with_ranges_ > 0) { - context->disks_complete_cond_var_.Wait(lock); - } - } } void DiskIoMgr::set_read_timer(DiskIoRequestContext* r, RuntimeProfile::Counter* c) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h index e7a7122..49de0ff 100644 --- a/be/src/runtime/disk-io-mgr.h +++ b/be/src/runtime/disk-io-mgr.h @@ -645,33 +645,27 @@ class DiskIoMgr : public CacheLineAligned { Status Init(MemTracker* process_mem_tracker) WARN_UNUSED_RESULT; /// Allocates tracking structure for a request context. - /// Register a new request context which is returned in *request_context. - /// The IoMgr owns the allocated DiskIoRequestContext object. The caller must call + /// Register a new request context and return it to the caller. The caller must call /// UnregisterContext() for each context. /// reader_mem_tracker: Is non-null only for readers. IO buffers /// used for this reader will be tracked by this. If the limit is exceeded /// the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via /// GetNext(). - void RegisterContext(DiskIoRequestContext** request_context, - MemTracker* reader_mem_tracker); - - /// Unregisters context from the disk IoMgr. This must be called for every - /// RegisterContext() regardless of cancellation and must be called in the - /// same thread as GetNext() - /// The 'context' cannot be used after this call. - /// This call blocks until all the disk threads have finished cleaning up. - /// UnregisterContext also cancels the reader/writer from the disk IoMgr. + std::unique_ptr<DiskIoRequestContext> RegisterContext(MemTracker* reader_mem_tracker); + + /// Unregisters context from the disk IoMgr by first cancelling it then blocking until + /// all references to the context are removed from I/O manager internal data structures. + /// This must be called for every RegisterContext() to ensure that the context object + /// can be safely destroyed. It is invalid to add more request ranges to 'context' after + /// after this call. This call blocks until all the disk threads have finished cleaning + /// up. void UnregisterContext(DiskIoRequestContext* context); /// This function cancels the context asychronously. All outstanding requests /// are aborted and tracking structures cleaned up. This does not need to be /// called if the context finishes normally. /// This will also fail any outstanding GetNext()/Read requests. - /// If wait_for_disks_completion is true, wait for the number of active disks for this - /// context to reach 0. After calling with wait_for_disks_completion = true, the only - /// valid API is returning IO buffers that have already been returned. - /// Takes context->lock_ if wait_for_disks_completion is true. - void CancelContext(DiskIoRequestContext* context, bool wait_for_disks_completion = false); + void CancelContext(DiskIoRequestContext* context); /// Adds the scan ranges to the queues. This call is non-blocking. The caller must /// not deallocate the scan range pointers before UnregisterContext(). @@ -825,7 +819,6 @@ class DiskIoMgr : public CacheLineAligned { friend class BufferDescriptor; friend class DiskIoRequestContext; struct DiskQueue; - class RequestContextCache; friend class DiskIoMgrTest_Buffers_Test; friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test; @@ -868,12 +861,6 @@ class DiskIoMgr : public CacheLineAligned { /// Total time spent in hdfs reading RuntimeProfile::Counter read_timer_; - /// Contains all contexts that the IoMgr is tracking. This includes contexts that are - /// active as well as those in the process of being cancelled. This is a cache - /// of context objects that get recycled to minimize object allocations and lock - /// contention. - boost::scoped_ptr<RequestContextCache> request_context_cache_; - /// Protects free_buffers_ boost::mutex free_buffers_lock_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index 3415b0d..308b2c4 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -223,22 +223,8 @@ Status RuntimeState::CheckQueryState() { return GetQueryStatus(); } -void RuntimeState::AcquireReaderContext(DiskIoRequestContext* reader_context) { - boost::lock_guard<SpinLock> l(reader_contexts_lock_); - reader_contexts_.push_back(reader_context); -} - -void RuntimeState::UnregisterReaderContexts() { - boost::lock_guard<SpinLock> l(reader_contexts_lock_); - for (DiskIoRequestContext* context : reader_contexts_) { - io_mgr()->UnregisterContext(context); - } - reader_contexts_.clear(); -} - void RuntimeState::ReleaseResources() { DCHECK(!released_resources_); - UnregisterReaderContexts(); if (filter_bank_ != nullptr) filter_bank_->Close(); if (resource_pool_ != nullptr) { exec_env_->thread_mgr()->UnregisterPool(resource_pool_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index c8ae147..74c27e5 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -36,7 +36,6 @@ class BufferPool; class DataStreamRecvr; class DescriptorTbl; class DiskIoMgr; -class DiskIoRequestContext; class Expr; class LlvmCodeGen; class MemTracker; @@ -195,14 +194,6 @@ class RuntimeState { return !CodegenDisabledByQueryOption() && !CodegenDisabledByHint(); } - /// Takes ownership of a scan node's reader context and plan fragment executor will call - /// UnregisterReaderContexts() to unregister it when the fragment is closed. The IO - /// buffers may still be in use and thus the deferred unregistration. - void AcquireReaderContext(DiskIoRequestContext* reader_context); - - /// Unregisters all reader contexts acquired through AcquireReaderContext(). - void UnregisterReaderContexts(); - inline Status GetQueryStatus() { // Do a racy check for query_status_ to avoid unnecessary spinlock acquisition. if (UNLIKELY(!query_status_.ok())) { @@ -389,12 +380,6 @@ class RuntimeState { SpinLock query_status_lock_; Status query_status_; - /// Reader contexts that need to be closed when the fragment is closed. - /// Synchronization is needed if there are multiple scan nodes in a plan fragment and - /// Close() may be called on them concurrently (see IMPALA-4180). - SpinLock reader_contexts_lock_; - std::vector<DiskIoRequestContext*> reader_contexts_; - /// This is the node id of the root node for this plan fragment. This is used as the /// hash seed and has two useful properties: /// 1) It is the same for all exec nodes in a fragment, so the resulting hash values http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/tmp-file-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index 5f482ba..dde6348 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -132,7 +132,7 @@ class TmpFileMgrTest : public ::testing::Test { /// Helper to cancel the FileGroup DiskIoRequestContext. static void CancelIoContext(TmpFileMgr::FileGroup* group) { - group->io_mgr_->CancelContext(group->io_ctx_); + group->io_mgr_->CancelContext(group->io_ctx_.get()); } /// Helper to get the # of bytes allocated by the group. Validates that the sum across http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index 8916d4f..f1e243c 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -26,6 +26,7 @@ #include <gutil/strings/join.h> #include <gutil/strings/substitute.h> +#include "runtime/disk-io-mgr-reader-context.h" #include "runtime/runtime-state.h" #include "runtime/tmp-file-mgr-internal.h" #include "util/bit-util.h" @@ -240,7 +241,7 @@ TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, next_allocation_index_(0), free_ranges_(64) { DCHECK(tmp_file_mgr != nullptr); - io_mgr_->RegisterContext(&io_ctx_, nullptr); + io_ctx_ = io_mgr_->RegisterContext(nullptr); } TmpFileMgr::FileGroup::~FileGroup() { @@ -282,8 +283,7 @@ Status TmpFileMgr::FileGroup::CreateFiles() { void TmpFileMgr::FileGroup::Close() { // Cancel writes before deleting the files, since in-flight writes could re-create // deleted files. - if (io_ctx_ != nullptr) io_mgr_->UnregisterContext(io_ctx_); - io_ctx_ = nullptr; + if (io_ctx_ != nullptr) io_mgr_->UnregisterContext(io_ctx_.get()); for (std::unique_ptr<TmpFileMgr::File>& file : tmp_files_) { Status status = file->Remove(); if (!status.ok()) { @@ -361,7 +361,7 @@ Status TmpFileMgr::FileGroup::Write( DiskIoMgr::WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr]( const Status& write_status) { WriteComplete(tmp_handle_ptr, write_status); }; RETURN_IF_ERROR( - tmp_handle->Write(io_mgr_, io_ctx_, tmp_file, file_offset, buffer, callback)); + tmp_handle->Write(io_mgr_, io_ctx_.get(), tmp_file, file_offset, buffer, callback)); write_counter_->Add(1); bytes_written_counter_->Add(buffer.len()); *handle = move(tmp_handle); @@ -394,7 +394,7 @@ Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) { DiskIoMgr::BufferOpts::ReadInto(buffer.data(), buffer.len())); read_counter_->Add(1); bytes_read_counter_->Add(buffer.len()); - RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_, handle->read_range_, true)); + RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_.get(), handle->read_range_, true)); return Status::OK(); } @@ -489,7 +489,7 @@ Status TmpFileMgr::FileGroup::RecoverWriteError( // Choose another file to try. Blacklisting ensures we don't retry the same file. // If this fails, the status will include all the errors in 'scratch_errors_'. RETURN_IF_ERROR(AllocateSpace(handle->len(), &tmp_file, &file_offset)); - return handle->RetryWrite(io_mgr_, io_ctx_, tmp_file, file_offset); + return handle->RetryWrite(io_mgr_, io_ctx_.get(), tmp_file, file_offset); } string TmpFileMgr::FileGroup::DebugString() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87fc463e/be/src/runtime/tmp-file-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index ba7210d..f550af2 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -19,6 +19,7 @@ #define IMPALA_RUNTIME_TMP_FILE_MGR_H #include <functional> +#include <memory> #include <utility> #include <boost/scoped_ptr.hpp> @@ -200,7 +201,7 @@ class TmpFileMgr { DiskIoMgr* const io_mgr_; /// I/O context used for all reads and writes. Registered in constructor. - DiskIoRequestContext* io_ctx_; + std::unique_ptr<DiskIoRequestContext> io_ctx_; /// Stores scan ranges allocated in Read(). Needed because ScanRange objects may be /// touched by DiskIoMgr even after the scan is finished.
