IMPALA-5389: simplify BufferDescriptor lifetime This is cleanup to make the code easier to understand in anticipation of some trickier changes to I/O buffer management for IMPALA-4835. I do not expect any changes in behaviour as a result of this patch.
* Use unique_ptr to make BufferDescriptor ownership transfer more explicit and allow enforcing that the buffers are not leaked via a DCHECK in ~BufferDescriptor. * Remove 'free_buffer_descs_' cache. TCMalloc is good at caching small objects and there will likely be a lot less lock contention compared with a single global lock. The cache did not avoid calls to malloc() anyway because appending to std::list<> requires allocating a list node. * Use std::deque instead of std::list in a couple of places - it offers O(1) push_*()/pop_*() at both ends and requires fewer calls into malloc()/free(). Testing: Ran ASAN and exhaustive builds. Change-Id: I007d098e9a1abb1f684be37b7f1ee6c03d3879b2 Reviewed-on: http://gerrit.cloudera.org:8080/7182 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3480c892 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3480c892 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3480c892 Branch: refs/heads/master Commit: 3480c892c86c41d0873c93e91f4ad5807a9daaee Parents: 1fc7e65 Author: Tim Armstrong <[email protected]> Authored: Tue May 30 13:20:40 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Jun 16 09:38:23 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-scanner.cc | 4 +- be/src/exec/scanner-context.cc | 16 ++-- be/src/exec/scanner-context.h | 6 +- be/src/runtime/disk-io-mgr-scan-range.cc | 28 +++--- be/src/runtime/disk-io-mgr-stress.cc | 5 +- be/src/runtime/disk-io-mgr-test.cc | 48 ++++++----- be/src/runtime/disk-io-mgr.cc | 120 ++++++++------------------ be/src/runtime/disk-io-mgr.h | 93 ++++++++------------ be/src/runtime/row-batch.cc | 11 +-- be/src/runtime/row-batch.h | 4 +- be/src/runtime/tmp-file-mgr.cc | 4 +- 11 files changed, 137 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 634d4ec..8de7c63 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -1357,13 +1357,13 @@ Status HdfsParquetScanner::ProcessFooter() { metadata_range_->disk_id(), metadata_range_->expected_local(), DiskIoMgr::BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size)); - DiskIoMgr::BufferDescriptor* io_buffer; + unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer; RETURN_IF_ERROR( io_mgr->Read(scan_node_->reader_context(), metadata_range, &io_buffer)); DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer()); DCHECK_EQ(io_buffer->len(), metadata_size); DCHECK(io_buffer->eosr()); - io_buffer->Return(); + io_mgr->ReturnBuffer(move(io_buffer)); } // Deserialize file header http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index 335c921..7a998b3 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -21,8 +21,9 @@ #include "exec/hdfs-scan-node-base.h" #include "exec/hdfs-scan-node.h" -#include "runtime/row-batch.h" +#include "runtime/exec-env.h" #include "runtime/mem-pool.h" +#include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "runtime/string-buffer.h" #include "util/debug-util.h" @@ -80,7 +81,6 @@ ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) { stream->file_len_ = stream->file_desc_->file_length; stream->total_bytes_returned_ = 0; stream->io_buffer_pos_ = NULL; - stream->io_buffer_ = NULL; stream->io_buffer_bytes_left_ = 0; stream->boundary_buffer_bytes_left_ = 0; stream->output_buffer_pos_ = NULL; @@ -97,24 +97,23 @@ void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool don // Mark any pending resources as completed if (io_buffer_ != nullptr) { ++parent_->num_completed_io_buffers_; - completed_io_buffers_.push_back(io_buffer_); + completed_io_buffers_.push_back(move(io_buffer_)); } // Set variables to nullptr to make sure streams are not used again - io_buffer_ = nullptr; io_buffer_pos_ = nullptr; io_buffer_bytes_left_ = 0; // Cancel the underlying scan range to clean up any queued buffers there scan_range_->Cancel(Status::CANCELLED); } - for (DiskIoMgr::BufferDescriptor* buffer: completed_io_buffers_) { + for (unique_ptr<DiskIoMgr::BufferDescriptor>& buffer : completed_io_buffers_) { if (contains_tuple_data_ && batch != nullptr) { - batch->AddIoBuffer(buffer); + batch->AddIoBuffer(move(buffer)); // TODO: We can do row batch compaction here. This is the only place io buffers are // queued. A good heuristic is to check the number of io buffers queued and if // there are too many, we should compact. } else { - buffer->Return(); + ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(buffer)); parent_->scan_node_->num_owned_io_buffers_.Add(-1); } } @@ -147,8 +146,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { if (io_buffer_ != NULL) { eosr = io_buffer_->eosr(); ++parent_->num_completed_io_buffers_; - completed_io_buffers_.push_back(io_buffer_); - io_buffer_ = NULL; + completed_io_buffers_.push_back(move(io_buffer_)); } if (!eosr) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/exec/scanner-context.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h index ff3cfa8..470ec01 100644 --- a/be/src/exec/scanner-context.h +++ b/be/src/exec/scanner-context.h @@ -19,6 +19,8 @@ #ifndef IMPALA_EXEC_SCANNER_CONTEXT_H #define IMPALA_EXEC_SCANNER_CONTEXT_H +#include <deque> + #include <boost/cstdint.hpp> #include <boost/scoped_ptr.hpp> @@ -198,7 +200,7 @@ class ScannerContext { int64_t next_read_past_size_bytes_; /// The current io buffer. This starts as NULL before we've read any bytes. - DiskIoMgr::BufferDescriptor* io_buffer_; + std::unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer_; /// Next byte to read in io_buffer_ uint8_t* io_buffer_pos_; @@ -230,7 +232,7 @@ class ScannerContext { /// On the next GetBytes() call, these buffers are released (the caller by calling /// GetBytes() signals it is done with its previous bytes). At this point the /// buffers are either returned to the io mgr or attached to the current row batch. - std::list<DiskIoMgr::BufferDescriptor*> completed_io_buffers_; + std::deque<std::unique_ptr<DiskIoMgr::BufferDescriptor>> completed_io_buffers_; Stream(ScannerContext* parent); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/disk-io-mgr-scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc index 1528cd6..d909b94 100644 --- a/be/src/runtime/disk-io-mgr-scan-range.cc +++ b/be/src/runtime/disk-io-mgr-scan-range.cc @@ -47,7 +47,7 @@ DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to u // that buffers are queued and read in file order. // This must be called with the reader lock taken. -bool DiskIoMgr::ScanRange::EnqueueBuffer(BufferDescriptor* buffer) { +bool DiskIoMgr::ScanRange::EnqueueBuffer(unique_ptr<BufferDescriptor> buffer) { { unique_lock<mutex> scan_range_lock(lock_); DCHECK(Validate()) << DebugString(); @@ -60,12 +60,12 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(BufferDescriptor* buffer) { reader_->num_buffers_in_reader_.Add(1); } reader_->num_used_buffers_.Add(-1); - buffer->Return(); + io_mgr_->ReturnBuffer(move(buffer)); return false; } reader_->num_ready_buffers_.Add(1); - ready_buffers_.push_back(buffer); eosr_queued_ = buffer->eosr(); + ready_buffers_.emplace_back(move(buffer)); blocked_on_queue_ = ready_buffers_.size() >= ready_buffers_capacity_; if (blocked_on_queue_ && ready_buffers_capacity_ > MIN_QUEUE_CAPACITY) { @@ -81,9 +81,8 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(BufferDescriptor* buffer) { return blocked_on_queue_; } -Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) { - *buffer = nullptr; - +Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) { + DCHECK(*buffer == nullptr); { unique_lock<mutex> scan_range_lock(lock_); if (eosr_returned_) return Status::OK(); @@ -107,7 +106,7 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) { // Remove the first ready buffer from the queue and return it DCHECK(!ready_buffers_.empty()); - *buffer = ready_buffers_.front(); + *buffer = move(ready_buffers_.front()); ready_buffers_.pop_front(); eosr_returned_ = (*buffer)->eosr(); } @@ -121,8 +120,7 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) { Status status = (*buffer)->status_; if (!status.ok()) { - (*buffer)->Return(); - *buffer = nullptr; + io_mgr_->ReturnBuffer(move(*buffer)); return status; } @@ -138,8 +136,7 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) { if (reader_->state_ == DiskIoRequestContext::Cancelled) { reader_->blocked_ranges_.Remove(this); Cancel(reader_->status_); - (*buffer)->Return(); - *buffer = nullptr; + io_mgr_->ReturnBuffer(move(*buffer)); return status_; } @@ -184,8 +181,7 @@ void DiskIoMgr::ScanRange::CleanupQueuedBuffers() { reader_->num_ready_buffers_.Add(-ready_buffers_.size()); while (!ready_buffers_.empty()) { - BufferDescriptor* buffer = ready_buffers_.front(); - buffer->Return(); + io_mgr_->ReturnBuffer(move(ready_buffers_.front())); ready_buffers_.pop_front(); } } @@ -605,13 +601,13 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) { // Create a single buffer desc for the entire scan range and enqueue that. // 'mem_tracker' is nullptr because the memory is owned by the HDFS java client, // not the Impala backend. - BufferDescriptor* desc = io_mgr_->GetBufferDesc(reader_, nullptr, this, - reinterpret_cast<uint8_t*>(buffer), 0); + unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor( + io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0, nullptr)); desc->len_ = bytes_read; desc->scan_range_offset_ = 0; desc->eosr_ = true; bytes_read_ = bytes_read; - EnqueueBuffer(desc); + EnqueueBuffer(move(desc)); if (reader_->bytes_read_counter_ != nullptr) { COUNTER_ADD(reader_->bytes_read_counter_, bytes_read); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/disk-io-mgr-stress.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-stress.cc b/be/src/runtime/disk-io-mgr-stress.cc index c25d6ef..af0c841 100644 --- a/be/src/runtime/disk-io-mgr-stress.cc +++ b/be/src/runtime/disk-io-mgr-stress.cc @@ -111,7 +111,7 @@ void DiskIoMgrStress::ClientThread(int client_id) { if (range == NULL) break; while (true) { - DiskIoMgr::BufferDescriptor* buffer; + unique_ptr<DiskIoMgr::BufferDescriptor> buffer; status = range->GetNext(&buffer); CHECK(status.ok() || status.IsCancelled()); if (buffer == NULL) break; @@ -133,8 +133,7 @@ void DiskIoMgrStress::ClientThread(int client_id) { // Copy the bytes from this read into the result buffer. memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len()); - buffer->Return(); - buffer = NULL; + io_mgr_->ReturnBuffer(move(buffer)); bytes_read += len; CHECK_GE(bytes_read, 0); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/disk-io-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc index 9167a6c..38c0dd5 100644 --- a/be/src/runtime/disk-io-mgr-test.cc +++ b/be/src/runtime/disk-io-mgr-test.cc @@ -113,33 +113,33 @@ class DiskIoMgrTest : public testing::Test { static void ValidateSyncRead(DiskIoMgr* io_mgr, DiskIoRequestContext* reader, DiskIoMgr::ScanRange* range, const char* expected, int expected_len = -1) { - DiskIoMgr::BufferDescriptor* buffer; + unique_ptr<DiskIoMgr::BufferDescriptor> buffer; ASSERT_OK(io_mgr->Read(reader, range, &buffer)); ASSERT_TRUE(buffer != NULL); EXPECT_EQ(buffer->len(), range->len()); if (expected_len < 0) expected_len = strlen(expected); int cmp = memcmp(buffer->buffer(), expected, expected_len); EXPECT_TRUE(cmp == 0); - buffer->Return(); + io_mgr->ReturnBuffer(move(buffer)); } - static void ValidateScanRange(DiskIoMgr::ScanRange* range, const char* expected, - int expected_len, const Status& expected_status) { + static void ValidateScanRange(DiskIoMgr* io_mgr, DiskIoMgr::ScanRange* range, + const char* expected, int expected_len, const Status& expected_status) { char result[expected_len + 1]; memset(result, 0, expected_len + 1); while (true) { - DiskIoMgr::BufferDescriptor* buffer = NULL; + unique_ptr<DiskIoMgr::BufferDescriptor> buffer; Status status = range->GetNext(&buffer); ASSERT_TRUE(status.ok() || status.code() == expected_status.code()); if (buffer == NULL || !status.ok()) { - if (buffer != NULL) buffer->Return(); + if (buffer != NULL) io_mgr->ReturnBuffer(move(buffer)); break; } ASSERT_LE(buffer->len(), expected_len); memcpy(result + range->offset() + buffer->scan_range_offset(), buffer->buffer(), buffer->len()); - buffer->Return(); + io_mgr->ReturnBuffer(move(buffer)); } ValidateEmptyOrCorrect(expected, result, expected_len); } @@ -155,7 +155,7 @@ class DiskIoMgrTest : public testing::Test { Status status = io_mgr->GetNextRange(reader, &range); ASSERT_TRUE(status.ok() || status.code() == expected_status.code()); if (range == NULL) break; - ValidateScanRange(range, expected_result, expected_len, expected_status); + ValidateScanRange(io_mgr, range, expected_result, expected_len, expected_status); num_ranges_processed->Add(1); ++num_ranges; } @@ -653,7 +653,7 @@ TEST_F(DiskIoMgrTest, MemLimits) { ASSERT_OK(io_mgr.AddScanRanges(reader, ranges)); // Don't return buffers to force memory pressure - vector<DiskIoMgr::BufferDescriptor*> buffers; + vector<unique_ptr<DiskIoMgr::BufferDescriptor>> buffers; AtomicInt32 num_ranges_processed; ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::MemLimitExceeded(), @@ -670,19 +670,19 @@ TEST_F(DiskIoMgrTest, MemLimits) { if (range == NULL) break; while (true) { - DiskIoMgr::BufferDescriptor* buffer = NULL; + unique_ptr<DiskIoMgr::BufferDescriptor> buffer; Status status = range->GetNext(&buffer); ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded()); if (buffer == NULL) break; memcpy(result + range->offset() + buffer->scan_range_offset(), buffer->buffer(), buffer->len()); - buffers.push_back(buffer); + buffers.push_back(move(buffer)); } ValidateEmptyOrCorrect(data, result, strlen(data)); } for (int i = 0; i < buffers.size(); ++i) { - buffers[i]->Return(); + io_mgr.ReturnBuffer(move(buffers[i])); } EXPECT_TRUE(io_mgr.context_status(reader).IsMemLimitExceeded()); @@ -964,12 +964,13 @@ TEST_F(DiskIoMgrTest, Buffers) { // buffer length should be rounded up to min buffer size int64_t buffer_len = 1; - DiskIoMgr::BufferDescriptor* buffer_desc; + unique_ptr<DiskIoMgr::BufferDescriptor> buffer_desc; buffer_desc = io_mgr.GetFreeBuffer(reader, dummy_range, buffer_len); EXPECT_TRUE(buffer_desc->buffer() != NULL); EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len()); EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load()); - io_mgr.FreeBufferMemory(buffer_desc); + io_mgr.FreeBufferMemory(buffer_desc.get()); + io_mgr.ReturnBuffer(move(buffer_desc)); EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption()); // reuse buffer @@ -978,7 +979,8 @@ TEST_F(DiskIoMgrTest, Buffers) { EXPECT_TRUE(buffer_desc->buffer() != NULL); EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len()); EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load()); - io_mgr.FreeBufferMemory(buffer_desc); + io_mgr.FreeBufferMemory(buffer_desc.get()); + io_mgr.ReturnBuffer(move(buffer_desc)); EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption()); // bump up to next buffer size @@ -994,7 +996,8 @@ TEST_F(DiskIoMgrTest, Buffers) { EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load()); EXPECT_EQ(min_buffer_size * 2, root_mem_tracker.consumption()); - io_mgr.FreeBufferMemory(buffer_desc); + io_mgr.FreeBufferMemory(buffer_desc.get()); + io_mgr.ReturnBuffer(move(buffer_desc)); // max buffer size buffer_len = max_buffer_size; @@ -1002,7 +1005,8 @@ TEST_F(DiskIoMgrTest, Buffers) { EXPECT_TRUE(buffer_desc->buffer() != NULL); EXPECT_EQ(max_buffer_size, buffer_desc->buffer_len()); EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load()); - io_mgr.FreeBufferMemory(buffer_desc); + io_mgr.FreeBufferMemory(buffer_desc.get()); + io_mgr.ReturnBuffer(move(buffer_desc)); EXPECT_EQ(min_buffer_size * 2 + max_buffer_size, root_mem_tracker.consumption()); // gc buffers @@ -1034,12 +1038,12 @@ TEST_F(DiskIoMgrTest, PartialRead) { // We should not read past the end of file. DiskIoMgr::ScanRange* range = InitRange(1, tmp_file, 0, read_len, 0, stat_val.st_mtime); - DiskIoMgr::BufferDescriptor* buffer; + unique_ptr<DiskIoMgr::BufferDescriptor> buffer; ASSERT_OK(io_mgr->Read(reader, range, &buffer)); ASSERT_TRUE(buffer->eosr()); ASSERT_EQ(len, buffer->len()); ASSERT_TRUE(memcmp(buffer->buffer(), data, len) == 0); - buffer->Return(); + io_mgr->ReturnBuffer(move(buffer)); io_mgr->UnregisterContext(reader); pool_.reset(); @@ -1073,7 +1077,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) { DiskIoMgr::BufferOpts::ReadInto(&client_buffer[0], buffer_len)); ASSERT_OK(io_mgr->AddScanRange(reader, range, true)); - DiskIoMgr::BufferDescriptor* io_buffer; + unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer; ASSERT_OK(range->GetNext(&io_buffer)); ASSERT_TRUE(io_buffer->eosr()); ASSERT_EQ(scan_len, io_buffer->len()); @@ -1082,7 +1086,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) { // DiskIoMgr should not have allocated memory. EXPECT_EQ(mem_tracker.consumption(), 0); - io_buffer->Return(); + io_mgr->ReturnBuffer(move(io_buffer)); } io_mgr->UnregisterContext(reader); @@ -1115,7 +1119,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) { /// the read fails before the cancellation. if (i >= 1) io_mgr->CancelContext(reader); - DiskIoMgr::BufferDescriptor* io_buffer; + unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer; ASSERT_FALSE(range->GetNext(&io_buffer).ok()); // DiskIoMgr should not have allocated memory. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 89a8863..bfb6d60 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -196,41 +196,21 @@ string DiskIoMgr::DebugString() { return ss.str(); } -DiskIoMgr::BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr) : io_mgr_(io_mgr) { - Reset(); -} - -void DiskIoMgr::BufferDescriptor::Reset() { - DCHECK(io_mgr_ != nullptr); - reader_ = nullptr; - scan_range_ = nullptr; - mem_tracker_ = nullptr; - buffer_ = nullptr; - buffer_len_ = 0; - len_ = 0; - eosr_ = false; - status_ = Status::OK(); - scan_range_offset_ = 0; -} - -void DiskIoMgr::BufferDescriptor::Reset(DiskIoRequestContext* reader, ScanRange* range, - uint8_t* buffer, int64_t buffer_len, MemTracker* mem_tracker) { - DCHECK(io_mgr_ != nullptr); - DCHECK(buffer_ == nullptr); - DCHECK(range != nullptr); +DiskIoMgr::BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, + DiskIoRequestContext* reader, ScanRange* scan_range, uint8_t* buffer, + int64_t buffer_len, MemTracker* mem_tracker) + : io_mgr_(io_mgr), + reader_(reader), + mem_tracker_(mem_tracker), + scan_range_(scan_range), + buffer_(buffer), + buffer_len_(buffer_len) { + DCHECK(io_mgr != nullptr); + DCHECK(scan_range != nullptr); DCHECK(buffer != nullptr); DCHECK_GE(buffer_len, 0); - DCHECK_NE(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER, + DCHECK_NE(scan_range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER, mem_tracker == nullptr); - reader_ = reader; - scan_range_ = range; - mem_tracker_ = mem_tracker; - buffer_ = buffer; - buffer_len_ = buffer_len; - len_ = 0; - eosr_ = false; - status_ = Status::OK(); - scan_range_offset_ = 0; } void DiskIoMgr::BufferDescriptor::TransferOwnership(MemTracker* dst) { @@ -244,11 +224,6 @@ void DiskIoMgr::BufferDescriptor::TransferOwnership(MemTracker* dst) { mem_tracker_ = dst; } -void DiskIoMgr::BufferDescriptor::Return() { - DCHECK(io_mgr_ != nullptr); - io_mgr_->ReturnBuffer(this); -} - DiskIoMgr::WriteRange::WriteRange( const string& file, int64_t file_offset, int disk_id, WriteDoneCallback callback) : RequestRange(RequestType::WRITE), callback_(callback) { @@ -630,7 +605,7 @@ Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range) } Status DiskIoMgr::Read(DiskIoRequestContext* reader, - ScanRange* range, BufferDescriptor** buffer) { + ScanRange* range, std::unique_ptr<BufferDescriptor>* buffer) { DCHECK(range != nullptr); DCHECK(buffer != nullptr); *buffer = nullptr; @@ -651,7 +626,7 @@ Status DiskIoMgr::Read(DiskIoRequestContext* reader, return Status::OK(); } -void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) { +void DiskIoMgr::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) { DCHECK(buffer_desc != nullptr); if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == nullptr); @@ -659,8 +634,9 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) { if (buffer_desc->buffer_ != nullptr) { if (!buffer_desc->is_cached() && !buffer_desc->is_client_buffer()) { // Buffers the were not allocated by DiskIoMgr don't need to be freed. - FreeBufferMemory(buffer_desc); + FreeBufferMemory(buffer_desc.get()); } + buffer_desc->buffer_ = nullptr; num_buffers_in_readers_.Add(-1); reader->num_buffers_in_reader_.Add(-1); } else { @@ -674,36 +650,10 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) { // Close() is idempotent so multiple cancelled buffers is okay. buffer_desc->scan_range_->Close(); } - ReturnBufferDesc(buffer_desc); -} - -void DiskIoMgr::ReturnBufferDesc(BufferDescriptor* desc) { - DCHECK(desc != nullptr); - desc->Reset(); - unique_lock<mutex> lock(free_buffers_lock_); - DCHECK(find(free_buffer_descs_.begin(), free_buffer_descs_.end(), desc) - == free_buffer_descs_.end()); - free_buffer_descs_.push_back(desc); -} - -DiskIoMgr::BufferDescriptor* DiskIoMgr::GetBufferDesc(DiskIoRequestContext* reader, - MemTracker* mem_tracker, ScanRange* range, uint8_t* buffer, int64_t buffer_size) { - BufferDescriptor* buffer_desc; - { - unique_lock<mutex> lock(free_buffers_lock_); - if (free_buffer_descs_.empty()) { - buffer_desc = pool_.Add(new BufferDescriptor(this)); - } else { - buffer_desc = free_buffer_descs_.front(); - free_buffer_descs_.pop_front(); - } - } - buffer_desc->Reset(reader, range, buffer, buffer_size, mem_tracker); - return buffer_desc; } -DiskIoMgr::BufferDescriptor* DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* reader, - ScanRange* range, int64_t buffer_size) { +unique_ptr<DiskIoMgr::BufferDescriptor> DiskIoMgr::GetFreeBuffer( + DiskIoRequestContext* reader, ScanRange* range, int64_t buffer_size) { DCHECK_LE(buffer_size, max_buffer_size_); DCHECK_GT(buffer_size, 0); buffer_size = min(static_cast<int64_t>(max_buffer_size_), buffer_size); @@ -744,7 +694,8 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* read DCHECK(range != nullptr); DCHECK(reader != nullptr); DCHECK(buffer != nullptr); - return GetBufferDesc(reader, reader->mem_tracker_, range, buffer, buffer_size); + return unique_ptr<BufferDescriptor>(new BufferDescriptor( + this, reader, range, buffer, buffer_size, reader->mem_tracker_)); } void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) { @@ -753,7 +704,7 @@ void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) { int bytes_freed = 0; // Free small-to-large to avoid retaining many small buffers and fragmenting memory. for (int idx = 0; idx < free_buffers_.size(); ++idx) { - std::list<uint8_t*>* free_buffers = &free_buffers_[idx]; + deque<uint8_t*>* free_buffers = &free_buffers_[idx]; while ( !free_buffers->empty() && (bytes_to_free == -1 || bytes_freed <= bytes_to_free)) { uint8_t* buffer = free_buffers->front(); @@ -972,7 +923,7 @@ void DiskIoMgr::HandleWriteFinished( } void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* reader, - BufferDescriptor* buffer) { + unique_ptr<BufferDescriptor> buffer) { unique_lock<mutex> reader_lock(reader->lock_); DiskIoRequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id]; @@ -983,11 +934,12 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* if (reader->state_ == DiskIoRequestContext::Cancelled) { state.DecrementRequestThreadAndCheckDone(reader); DCHECK(reader->Validate()) << endl << reader->DebugString(); - if (!buffer->is_client_buffer()) FreeBufferMemory(buffer); + if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get()); buffer->buffer_ = nullptr; - buffer->scan_range_->Cancel(reader->status_); + ScanRange* scan_range = buffer->scan_range_; + scan_range->Cancel(reader->status_); // Enqueue the buffer to use the scan range's buffer cleanup path. - buffer->scan_range_->EnqueueBuffer(buffer); + scan_range->EnqueueBuffer(move(buffer)); return; } @@ -1000,7 +952,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* // 3. Middle of scan range if (!buffer->status_.ok()) { // Error case - if (!buffer->is_client_buffer()) FreeBufferMemory(buffer); + if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get()); buffer->buffer_ = nullptr; buffer->eosr_ = true; --state.num_remaining_ranges(); @@ -1014,7 +966,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* bool eosr = buffer->eosr_; ScanRange* scan_range = buffer->scan_range_; bool is_cached = buffer->is_cached(); - bool queue_full = scan_range->EnqueueBuffer(buffer); + bool queue_full = scan_range->EnqueueBuffer(move(buffer)); if (eosr) { // For cached buffers, we can't close the range until the cached buffer is returned. // Close() is called from DiskIoMgr::ReturnBuffer(). @@ -1063,14 +1015,14 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) { // This function reads the specified scan range associated with the // specified reader context and disk queue. -void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader, - ScanRange* range) { +void DiskIoMgr::ReadRange( + DiskQueue* disk_queue, DiskIoRequestContext* reader, ScanRange* range) { int64_t bytes_remaining = range->len_ - range->bytes_read_; DCHECK_GT(bytes_remaining, 0); - BufferDescriptor* buffer_desc = nullptr; + unique_ptr<BufferDescriptor> buffer_desc; if (range->external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) { - buffer_desc = GetBufferDesc( - reader, nullptr, range, range->client_buffer_.data, range->client_buffer_.len); + buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(this, reader, range, + range->client_buffer_.data, range->client_buffer_.len, nullptr)); } else { // Need to allocate a buffer to read into. int64_t buffer_size = ::min(bytes_remaining, static_cast<int64_t>(max_buffer_size_)); @@ -1109,10 +1061,10 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader, } // Finished read, update reader/disk based on the results - HandleReadFinished(disk_queue, reader, buffer_desc); + HandleReadFinished(disk_queue, reader, move(buffer_desc)); } -DiskIoMgr::BufferDescriptor* DiskIoMgr::TryAllocateNextBufferForRange( +unique_ptr<DiskIoMgr::BufferDescriptor> DiskIoMgr::TryAllocateNextBufferForRange( DiskQueue* disk_queue, DiskIoRequestContext* reader, ScanRange* range, int64_t buffer_size) { DCHECK(reader->mem_tracker_ != nullptr); @@ -1149,7 +1101,7 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::TryAllocateNextBufferForRange( // now. } } - BufferDescriptor* buffer_desc = GetFreeBuffer(reader, range, buffer_size); + unique_ptr<BufferDescriptor> buffer_desc = GetFreeBuffer(reader, range, buffer_size); DCHECK(buffer_desc != nullptr); return buffer_desc; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h index aae19ee..9621c92 100644 --- a/be/src/runtime/disk-io-mgr.h +++ b/be/src/runtime/disk-io-mgr.h @@ -18,8 +18,8 @@ #ifndef IMPALA_RUNTIME_DISK_IO_MGR_H #define IMPALA_RUNTIME_DISK_IO_MGR_H +#include <deque> #include <functional> -#include <list> #include <vector> #include <boost/scoped_ptr.hpp> @@ -146,9 +146,9 @@ class MemTracker; /// caller when constructing the scan range. /// /// As a caller reads from a scan range, these buffers are wrapped in BufferDescriptors -/// and returned to the caller. The caller must always call Return() on the buffer -/// descriptor when it when it is done to allow recycling of the buffer descriptor and -/// the associated buffer (if there is an IoMgr-allocated or HDFS cached buffer). +/// and returned to the caller. The caller must always call ReturnBuffer() on the buffer +/// descriptor to allow recycling of the associated buffer (if there is an +/// IoMgr-allocated or HDFS cached buffer). /// /// Caching support: /// Scan ranges contain metadata on whether or not it is cached on the DN. In that @@ -169,7 +169,7 @@ class MemTracker; /// the number of scanner threads properly controls the amount of files we mlock. /// With cached scan ranges, we cannot close the scan range until the cached buffer /// is returned (HDFS does not allow this). We therefore need to defer the close until -/// the cached buffer is returned (BufferDescriptor::Return()). +/// the cached buffer is returned (ReturnBuffer()). // /// Remote filesystem support (e.g. S3): /// Remote filesystems are modeled as "remote disks". That is, there is a seperate disk @@ -208,6 +208,10 @@ class DiskIoMgr : public CacheLineAligned { /// time. class BufferDescriptor { public: + ~BufferDescriptor() { + DCHECK(buffer_ == nullptr); // Check we didn't leak a buffer. + } + ScanRange* scan_range() { return scan_range_; } uint8_t* buffer() { return buffer_; } int64_t buffer_len() { return buffer_len_; } @@ -225,15 +229,16 @@ class DiskIoMgr : public CacheLineAligned { /// TODO: IMPALA-3209: revisit this as part of scanner memory usage revamp. void TransferOwnership(MemTracker* dst); - /// Returns the buffer to the IoMgr. This must be called for every buffer - /// returned by GetNext()/Read() that did not return an error. This is non-blocking. - /// After calling this, the buffer descriptor is invalid and cannot be accessed. - void Return(); - private: friend class DiskIoMgr; + friend class DiskIoMgr::ScanRange; friend class DiskIoRequestContext; - BufferDescriptor(DiskIoMgr* io_mgr); + + /// Create a buffer descriptor for a new reader, range and data buffer. The buffer + /// memory should already be accounted against 'mem_tracker'. + BufferDescriptor(DiskIoMgr* io_mgr, DiskIoRequestContext* reader, + ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len, + MemTracker* mem_tracker); /// Return true if this is a cached buffer owned by HDFS. bool is_cached() const { @@ -248,42 +253,34 @@ class DiskIoMgr : public CacheLineAligned { == ScanRange::ExternalBufferTag::CLIENT_BUFFER; } - /// Reset the buffer descriptor to an uninitialized state. - void Reset(); - - /// Resets the buffer descriptor state for a new reader, range and data buffer. - /// The buffer memory should already be accounted against MemTracker - void Reset(DiskIoRequestContext* reader, ScanRange* range, uint8_t* buffer, - int64_t buffer_len, MemTracker* mem_tracker); - DiskIoMgr* const io_mgr_; /// Reader that this buffer is for. - DiskIoRequestContext* reader_; + DiskIoRequestContext* const reader_; /// The current tracker this buffer is associated with. After initialisation, /// NULL for cached buffers and non-NULL for all other buffers. MemTracker* mem_tracker_; /// Scan range that this buffer is for. Non-NULL when initialised. - ScanRange* scan_range_; + ScanRange* const scan_range_; /// buffer with the read contents uint8_t* buffer_; /// length of buffer_. For buffers from cached reads, the length is 0. - int64_t buffer_len_; + const int64_t buffer_len_; /// length of read contents - int64_t len_; + int64_t len_ = 0; /// true if the current scan range is complete - bool eosr_; + bool eosr_ = false; /// Status of the read to this buffer. if status is not ok, 'buffer' is nullptr Status status_; - int64_t scan_range_offset_; + int64_t scan_range_offset_ = 0; }; /// The request type, read or write associated with a request range. @@ -411,7 +408,7 @@ class DiskIoMgr : public CacheLineAligned { /// called when all buffers have been returned, *buffer is set to nullptr and Status::OK /// is returned. /// Only one thread can be in GetNext() at any time. - Status GetNext(BufferDescriptor** buffer) WARN_UNUSED_RESULT; + Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT; /// Cancel this scan range. This cleans up all queued buffers and /// wakes up any threads blocked on GetNext(). @@ -435,7 +432,7 @@ class DiskIoMgr : public CacheLineAligned { /// Returns true if this scan range has hit the queue capacity, false otherwise. /// The caller passes ownership of buffer to the scan range and it is not /// valid to access buffer after this call. - bool EnqueueBuffer(BufferDescriptor* buffer); + bool EnqueueBuffer(std::unique_ptr<BufferDescriptor> buffer); /// Cleanup any queued buffers (i.e. due to cancellation). This cannot /// be called with any locks taken. @@ -566,7 +563,7 @@ class DiskIoMgr : public CacheLineAligned { /// IO buffers that are queued for this scan range. /// Condition variable for GetNext boost::condition_variable buffer_ready_cv_; - std::list<BufferDescriptor*> ready_buffers_; + std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_; /// The soft capacity limit for ready_buffers_. ready_buffers_ can exceed /// the limit temporarily as the capacity is adjusted dynamically. @@ -711,7 +708,12 @@ class DiskIoMgr : public CacheLineAligned { /// This can only be used if the scan range fits in a single IO buffer (i.e. is smaller /// than max_read_buffer_size()) or if reading into a client-provided buffer. Status Read(DiskIoRequestContext* reader, ScanRange* range, - BufferDescriptor** buffer) WARN_UNUSED_RESULT; + std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT; + + /// Returns the buffer to the IoMgr. This must be called for every buffer + /// returned by GetNext()/Read() that did not return an error. This is non-blocking. + /// After calling this, the buffer descriptor is invalid and cannot be accessed. + void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer); /// Determine which disk queue this file should be assigned to. Returns an index into /// disk_queues_. The disk_id is the volume ID for the local disk that holds the @@ -810,9 +812,6 @@ class DiskIoMgr : public CacheLineAligned { friend class DiskIoMgrTest_Buffers_Test; - /// Pool to allocate BufferDescriptors. - ObjectPool pool_; - /// Memory tracker for unused I/O buffers owned by DiskIoMgr. boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_; @@ -853,7 +852,7 @@ class DiskIoMgr : public CacheLineAligned { /// contention. boost::scoped_ptr<RequestContextCache> request_context_cache_; - /// Protects free_buffers_ and free_buffer_descs_ + /// Protects free_buffers_ boost::mutex free_buffers_lock_; /// Free buffers that can be handed out to clients. There is one list for each buffer @@ -867,10 +866,7 @@ class DiskIoMgr : public CacheLineAligned { /// free_buffers_[10] => list of free buffers with size 1 MB /// free_buffers_[13] => list of free buffers with size 8 MB /// free_buffers_[n] => list of free buffers with size 2^n * 1024 B - std::vector<std::list<uint8_t*>> free_buffers_; - - /// List of free buffer desc objects that can be handed out to clients - std::list<BufferDescriptor*> free_buffer_descs_; + std::vector<std::deque<uint8_t*>> free_buffers_; /// Total number of allocated buffers, used for debugging. AtomicInt32 num_allocated_buffers_; @@ -905,21 +901,8 @@ class DiskIoMgr : public CacheLineAligned { /// The returned *buffer_size must be between 0 and 'max_buffer_size_'. /// The buffer memory is tracked against reader's mem tracker, or /// 'unowned_buffer_mem_tracker_' if the reader does not have one. - BufferDescriptor* GetFreeBuffer(DiskIoRequestContext* reader, ScanRange* range, - int64_t buffer_size); - - /// Gets a BufferDescriptor initialized with the provided parameters. The object may be - /// recycled or newly allocated. Does not do anything aside from initialize the - /// descriptor's fields. - BufferDescriptor* GetBufferDesc(DiskIoRequestContext* reader, MemTracker* mem_tracker, - ScanRange* range, uint8_t* buffer, int64_t buffer_size); - - /// Returns the buffer desc and underlying buffer to the disk IoMgr. This also updates - /// the reader and disk queue state. - void ReturnBuffer(BufferDescriptor* buffer); - - /// Returns a buffer desc object which can now be used for another reader. - void ReturnBufferDesc(BufferDescriptor* desc); + std::unique_ptr<BufferDescriptor> GetFreeBuffer( + DiskIoRequestContext* reader, ScanRange* range, int64_t buffer_size); /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be nullptr), either /// freeing it or returning it to 'free_buffers_'. Memory tracking is updated to @@ -941,7 +924,8 @@ class DiskIoMgr : public CacheLineAligned { /// Updates disk queue and reader state after a read is complete. The read result /// is captured in the buffer descriptor. - void HandleReadFinished(DiskQueue*, DiskIoRequestContext*, BufferDescriptor*); + void HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* reader, + std::unique_ptr<BufferDescriptor> buffer); /// Invokes write_range->callback_ after the range has been written and /// updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR @@ -971,10 +955,9 @@ class DiskIoMgr : public CacheLineAligned { /// if successful. If 'reader' is cancelled, cancels the range and returns nullptr. /// If there is memory pressure and buffers are already queued, adds the range /// to the blocked ranges and returns nullptr. - BufferDescriptor* TryAllocateNextBufferForRange(DiskQueue* disk_queue, + std::unique_ptr<BufferDescriptor> TryAllocateNextBufferForRange(DiskQueue* disk_queue, DiskIoRequestContext* reader, ScanRange* range, int64_t buffer_size); }; - } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/row-batch.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index 7f200dc..7668063 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -18,6 +18,7 @@ #include "runtime/row-batch.h" #include <stdint.h> // for intptr_t +#include <memory> #include <boost/scoped_ptr.hpp> #include "gen-cpp/Results_types.h" @@ -153,7 +154,7 @@ RowBatch::RowBatch( RowBatch::~RowBatch() { tuple_data_pool_.FreeAll(); for (int i = 0; i < io_buffers_.size(); ++i) { - io_buffers_[i]->Return(); + ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffers_[i])); } for (int i = 0; i < blocks_.size(); ++i) { blocks_[i]->Delete(); @@ -295,11 +296,11 @@ void RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples, DCHECK_EQ(offset, size); } -void RowBatch::AddIoBuffer(DiskIoMgr::BufferDescriptor* buffer) { +void RowBatch::AddIoBuffer(unique_ptr<DiskIoMgr::BufferDescriptor> buffer) { DCHECK(buffer != NULL); - io_buffers_.push_back(buffer); auxiliary_mem_usage_ += buffer->buffer_len(); buffer->TransferOwnership(mem_tracker_); + io_buffers_.emplace_back(move(buffer)); } void RowBatch::AddBlock(BufferedBlockMgr::Block* block, FlushMode flush) { @@ -326,7 +327,7 @@ void RowBatch::Reset() { // TODO: Change this to Clear() and investigate the repercussions. tuple_data_pool_.FreeAll(); for (int i = 0; i < io_buffers_.size(); ++i) { - io_buffers_[i]->Return(); + ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffers_[i])); } io_buffers_.clear(); for (int i = 0; i < blocks_.size(); ++i) { @@ -349,7 +350,7 @@ void RowBatch::Reset() { void RowBatch::TransferResourceOwnership(RowBatch* dest) { dest->tuple_data_pool_.AcquireData(&tuple_data_pool_, false); for (int i = 0; i < io_buffers_.size(); ++i) { - dest->AddIoBuffer(io_buffers_[i]); + dest->AddIoBuffer(move(io_buffers_[i])); } io_buffers_.clear(); for (int i = 0; i < blocks_.size(); ++i) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/row-batch.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index e4b7191..0068478 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -214,7 +214,7 @@ class RowBatch { void Reset(); /// Add io buffer to this row batch. - void AddIoBuffer(DiskIoMgr::BufferDescriptor* buffer); + void AddIoBuffer(std::unique_ptr<DiskIoMgr::BufferDescriptor> buffer); /// Adds a block to this row batch. The block must be pinned. The blocks must be /// deleted when freeing resources. The block's memory remains accounted against @@ -432,7 +432,7 @@ class RowBatch { /// IO buffers current owned by this row batch. Ownership of IO buffers transfer /// between row batches. Any IO buffer will be owned by at most one row batch /// (i.e. they are not ref counted) so most row batches don't own any. - std::vector<DiskIoMgr::BufferDescriptor*> io_buffers_; + std::vector<std::unique_ptr<DiskIoMgr::BufferDescriptor>> io_buffers_; /// Blocks attached to this row batch. The underlying memory and block manager client /// are owned by the BufferedBlockMgr. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3480c892/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index bc09f2e..c99077f 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -402,7 +402,7 @@ Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* handle, MemRange buf // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state // since the write is not in flight. SCOPED_TIMER(disk_read_timer_); - DiskIoMgr::BufferDescriptor* io_mgr_buffer = nullptr; + unique_ptr<DiskIoMgr::BufferDescriptor> io_mgr_buffer; Status status = handle->read_range_->GetNext(&io_mgr_buffer); if (!status.ok()) goto exit; DCHECK(io_mgr_buffer != NULL); @@ -423,7 +423,7 @@ Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* handle, MemRange buf } exit: // Always return the buffer before exiting to avoid leaking it. - if (io_mgr_buffer != nullptr) io_mgr_buffer->Return(); + if (io_mgr_buffer != nullptr) io_mgr_->ReturnBuffer(move(io_mgr_buffer)); handle->read_range_ = nullptr; return status; }
