This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1fc5515a78 [enhancement](memory) Remove unused reservation tracker
(#11969)
1fc5515a78 is described below
commit 1fc5515a780a00ad96644ab92a46e83c57621ea8
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Aug 24 08:49:34 2022 +0800
[enhancement](memory) Remove unused reservation tracker (#11969)
---
be/src/exec/exec_node.cpp | 13 +-
be/src/exec/exec_node.h | 5 -
be/src/exec/partitioned_aggregation_node.cc | 58 +--
be/src/exec/partitioned_aggregation_node.h | 24 --
be/src/runtime/CMakeLists.txt | 3 -
be/src/runtime/buffered_tuple_stream3.cc | 224 +-----------
be/src/runtime/buffered_tuple_stream3.h | 52 +--
be/src/runtime/bufferpool/buffer_pool.cc | 101 +-----
be/src/runtime/bufferpool/buffer_pool.h | 78 +---
be/src/runtime/bufferpool/buffer_pool_internal.h | 20 +-
be/src/runtime/bufferpool/reservation_tracker.cc | 401 ---------------------
be/src/runtime/bufferpool/reservation_tracker.h | 290 ---------------
.../bufferpool/reservation_tracker_counters.h | 38 --
be/src/runtime/bufferpool/reservation_util.cc | 40 --
be/src/runtime/bufferpool/reservation_util.h | 71 ----
be/src/runtime/bufferpool/suballocator.cc | 5 -
be/src/runtime/exec_env.h | 5 +-
be/src/runtime/exec_env_init.cpp | 4 -
be/src/runtime/initial_reservations.cc | 83 -----
be/src/runtime/initial_reservations.h | 78 ----
be/src/runtime/memory/mem_tracker_limiter.cpp | 45 +--
be/src/runtime/runtime_state.cpp | 56 +--
be/src/runtime/runtime_state.h | 36 --
be/test/exec/tablet_sink_test.cpp | 3 -
be/test/runtime/test_env.cc | 4 +-
be/test/util/arrow/arrow_work_flow_test.cpp | 3 -
be/test/vec/exec/vtablet_sink_test.cpp | 3 -
27 files changed, 52 insertions(+), 1691 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 66827019f9..88cbb8cdf1 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -53,7 +53,6 @@
#include "odbc_scan_node.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
-#include "runtime/initial_reservations.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
@@ -286,9 +285,6 @@ Status ExecNode::close(RuntimeState* state) {
}
if (_buffer_pool_client.is_registered()) {
- VLOG_FILE << _id << " returning reservation " <<
_resource_profile.min_reservation;
- state->initial_reservations()->Return(&_buffer_pool_client,
-
_resource_profile.min_reservation);
state->exec_env()->buffer_pool()->DeregisterClient(&_buffer_pool_client);
}
@@ -727,11 +723,8 @@ Status ExecNode::claim_buffer_reservation(RuntimeState*
state) {
}
ss << print_plan_node_type(_type) << " id=" << _id << " ptr=" << this;
- RETURN_IF_ERROR(buffer_pool->RegisterClient(ss.str(),
state->instance_buffer_reservation(),
-
buffer_pool->GetSystemBytesLimit(),
- runtime_profile(),
&_buffer_pool_client));
+ RETURN_IF_ERROR(buffer_pool->RegisterClient(ss.str(), runtime_profile(),
&_buffer_pool_client));
- state->initial_reservations()->Claim(&_buffer_pool_client,
_resource_profile.min_reservation);
/*
if (debug_action_ == TDebugAction::SET_DENY_RESERVATION_PROBABILITY &&
(debug_phase_ == TExecNodePhase::PREPARE || debug_phase_ ==
TExecNodePhase::OPEN)) {
@@ -744,10 +737,6 @@ Status ExecNode::claim_buffer_reservation(RuntimeState*
state) {
return Status::OK();
}
-Status ExecNode::release_unused_reservation() {
- return
_buffer_pool_client.DecreaseReservationTo(_resource_profile.min_reservation);
-}
-
void ExecNode::release_block_memory(vectorized::Block& block, uint16_t
child_idx) {
DCHECK(child_idx < _children.size());
block.clear_column_data(child(child_idx)->row_desc().num_materialized_slots());
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 101636f0e8..6695faf7e7 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -210,11 +210,6 @@ protected:
/// as the initial reservation is not released before Close().
Status claim_buffer_reservation(RuntimeState* state);
- /// Release any unused reservation in excess of the node's initial
reservation. Returns
- /// an error if releasing the reservation requires flushing pages to disk,
and that
- /// fails.
- Status release_unused_reservation();
-
/// Release all memory of block which got from child. The block
// 1. clear mem of valid column get from child, make sure child can reuse
the mem
// 2. delete and release the column which create by function all and other
reason
diff --git a/be/src/exec/partitioned_aggregation_node.cc
b/be/src/exec/partitioned_aggregation_node.cc
index 12b648f6b2..16332151b0 100644
--- a/be/src/exec/partitioned_aggregation_node.cc
+++ b/be/src/exec/partitioned_aggregation_node.cc
@@ -253,7 +253,6 @@ Status PartitionedAggregationNode::open(RuntimeState*
state) {
// Claim reservation after the child has been opened to reduce the peak
reservation
// requirement.
if (!_buffer_pool_client.is_registered() && !grouping_exprs_.empty()) {
- DCHECK_GE(_resource_profile.min_reservation, MinReservation());
RETURN_IF_ERROR(claim_buffer_reservation(state));
}
@@ -278,12 +277,7 @@ Status PartitionedAggregationNode::open(RuntimeState*
state) {
state, &intermediate_row_desc_, &_buffer_pool_client,
_resource_profile.spillable_buffer_size));
RETURN_IF_ERROR(serialize_stream_->Init(id(), false));
- bool got_buffer;
- // Reserve the memory for 'serialize_stream_' so we don't need
to scrounge up
- // another buffer during spilling.
-
RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer));
- // DCHECK(got_buffer)
- // << "Accounted in min reservation" <<
_buffer_pool_client.DebugString();
+ RETURN_IF_ERROR(serialize_stream_->PrepareForWrite());
DCHECK(serialize_stream_->has_write_iterator());
}
}
@@ -743,13 +737,7 @@ Status
PartitionedAggregationNode::Partition::InitStreams() {
parent->state_, &parent->intermediate_row_desc_,
&parent->_buffer_pool_client,
parent->_resource_profile.spillable_buffer_size,
external_varlen_slots));
RETURN_IF_ERROR(aggregated_row_stream->Init(parent->id(), true));
- bool got_buffer;
- RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer));
- // TODO(zxy) If exec_mem_limit is very small, DCHECK(false) will occur,
the logic of
- // reservation tracker needs to be deleted or refactored
- // DCHECK(got_buffer) << "Buffer included in reservation " << parent->_id
<< "\n"
- // << parent->_buffer_pool_client.DebugString() << "\n"
- // << parent->DebugString(2);
+ RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite());
if (!parent->is_streaming_preagg_) {
unaggregated_row_stream.reset(new BufferedTupleStream3(
@@ -828,9 +816,7 @@ Status
PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
parent->_resource_profile.spillable_buffer_size));
status = parent->serialize_stream_->Init(parent->id(), false);
if (status.ok()) {
- bool got_buffer;
- status = parent->serialize_stream_->PrepareForWrite(&got_buffer);
- // DCHECK(!status.ok() || got_buffer) << "Accounted in min
reservation";
+ status = parent->serialize_stream_->PrepareForWrite();
}
if (!status.ok()) {
hash_tbl->Close();
@@ -873,10 +859,7 @@ Status PartitionedAggregationNode::Partition::Spill(bool
more_aggregate_rows) {
//
aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL_EXCEPT_CURRENT);
} else {
//
aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL);
- bool got_buffer;
- RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
- // DCHECK(got_buffer) << "Accounted in min reservation"
- // << parent->_buffer_pool_client.DebugString();
+ RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite());
}
COUNTER_UPDATE(parent->num_spilled_partitions_, 1);
@@ -1196,16 +1179,6 @@ Status
PartitionedAggregationNode::CheckAndResizeHashPartitions(
Status PartitionedAggregationNode::NextPartition() {
DCHECK(output_partition_ == nullptr);
- if (!is_in_subplan() && spilled_partitions_.empty()) {
- // All partitions are in memory. Release reservation that was used for
previous
- // partitions that is no longer needed. If we have spilled partitions,
we want to
- // hold onto all reservation in case it is needed to process the
spilled partitions.
- DCHECK(!_buffer_pool_client.has_unpinned_pages());
- Status status = release_unused_reservation();
- DCHECK(status.ok()) << "Should not fail - all partitions are in memory
so there are "
- << "no unpinned pages. " << status.get_error_msg();
- }
-
// Keep looping until we get to a partition that fits in memory.
Partition* partition = nullptr;
while (true) {
@@ -1217,12 +1190,6 @@ Status PartitionedAggregationNode::NextPartition() {
break;
}
- // No aggregated partitions in memory - we should not be using any
reservation aside
- // from 'serialize_stream_'.
- DCHECK_EQ(serialize_stream_ != nullptr ?
serialize_stream_->BytesPinned(false) : 0,
- _buffer_pool_client.GetUsedReservation())
- << _buffer_pool_client.DebugString();
-
// Try to fit a single spilled partition in memory. We can often do
this because
// we only need to fit 1/PARTITION_FANOUT of the data in memory.
// TODO: in some cases when the partition probably won't fit in memory
it could
@@ -1281,11 +1248,6 @@ Status
PartitionedAggregationNode::BuildSpilledPartition(Partition** built_parti
if (dst_partition->is_spilled()) {
PushSpilledPartition(dst_partition);
*built_partition = nullptr;
- // Spilled the partition - we should not be using any reservation
except from
- // 'serialize_stream_'.
- DCHECK_EQ(serialize_stream_ != nullptr ?
serialize_stream_->BytesPinned(false) : 0,
- _buffer_pool_client.GetUsedReservation())
- << _buffer_pool_client.DebugString();
} else {
*built_partition = dst_partition;
}
@@ -1315,9 +1277,7 @@ Status
PartitionedAggregationNode::RepartitionSpilledPartition() {
// The aggregated rows have been repartitioned. Free up at least a
buffer's worth of
// reservation and use it to pin the unaggregated write buffer.
//
hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStream3::UNPIN_ALL);
- bool got_buffer;
-
RETURN_IF_ERROR(hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer));
- // DCHECK(got_buffer) << "Accounted in min reservation" <<
_buffer_pool_client.DebugString();
+
RETURN_IF_ERROR(hash_partition->unaggregated_row_stream->PrepareForWrite());
}
RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get()));
@@ -1339,13 +1299,7 @@ template <bool AGGREGATED_ROWS>
Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream3*
input_stream) {
DCHECK(!is_streaming_preagg_);
if (input_stream->num_rows() > 0) {
- while (true) {
- bool got_buffer = false;
- RETURN_IF_ERROR(input_stream->PrepareForRead(true, &got_buffer));
- if (got_buffer) break;
- // Did not have a buffer to read the input stream. Spill and try
again.
- RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
- }
+ RETURN_IF_ERROR(input_stream->PrepareForRead(true));
bool eos = false;
const RowDescriptor* desc =
diff --git a/be/src/exec/partitioned_aggregation_node.h
b/be/src/exec/partitioned_aggregation_node.h
index c4f95d5036..c5d9a505d1 100644
--- a/be/src/exec/partitioned_aggregation_node.h
+++ b/be/src/exec/partitioned_aggregation_node.h
@@ -691,30 +691,6 @@ private:
/// Calls finalizes on all tuples starting at 'it'.
void CleanupHashTbl(const std::vector<NewAggFnEvaluator*>& agg_fn_evals,
PartitionedHashTable::Iterator it);
-
- /// Compute minimum buffer reservation for grouping aggregations.
- /// We need one buffer per partition, which is used either as the write
buffer for the
- /// aggregated stream or the unaggregated stream. We need an additional
buffer to read
- /// the stream we are currently repartitioning. The read buffer needs to
be a max-sized
- /// buffer to hold a max-sized row and we need one max-sized write buffer
that is used
- /// temporarily to append a row to any stream.
- ///
- /// If we need to serialize, we need an additional buffer while spilling a
partition
- /// as the partitions aggregate stream needs to be serialized and
rewritten.
- /// We do not spill streaming preaggregations, so we do not need to
reserve any buffers.
- int64_t MinReservation() const {
- //DCHECK(!grouping_exprs_.empty());
- // Must be kept in sync with
AggregationNode.computeNodeResourceProfile() in fe.
- //if (is_streaming_preagg_) {
- // Reserve at least one buffer and a 64kb hash table per partition.
- // return (_resource_profile.spillable_buffer_size + 64 * 1024) *
PARTITION_FANOUT;
- //}
- //int num_buffers = PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0);
- // Two of the buffers must fit the maximum row.
- //return _resource_profile.spillable_buffer_size * (num_buffers - 2) +
- //_resource_profile.max_row_buffer_size * 2;
- return 0;
- }
};
} // namespace doris
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 5ff765f137..39769f323e 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -75,11 +75,8 @@ set(RUNTIME_FILES
tablets_channel.cpp
bufferpool/buffer_allocator.cc
bufferpool/buffer_pool.cc
- bufferpool/reservation_tracker.cc
- bufferpool/reservation_util.cc
bufferpool/suballocator.cc
bufferpool/system_allocator.cc
- initial_reservations.cc
snapshot_loader.cpp
query_statistics.cpp
message_body_sink.cpp
diff --git a/be/src/runtime/buffered_tuple_stream3.cc
b/be/src/runtime/buffered_tuple_stream3.cc
index 8c05608831..2a35f5c70c 100644
--- a/be/src/runtime/buffered_tuple_stream3.cc
+++ b/be/src/runtime/buffered_tuple_stream3.cc
@@ -55,7 +55,6 @@ BufferedTupleStream3::BufferedTupleStream3(RuntimeState*
state, const RowDescrip
num_pages_(0),
total_byte_size_(0),
has_read_iterator_(false),
- read_page_reservation_(buffer_pool_client_),
read_page_rows_returned_(-1),
read_ptr_(nullptr),
read_end_ptr_(nullptr),
@@ -64,7 +63,6 @@ BufferedTupleStream3::BufferedTupleStream3(RuntimeState*
state, const RowDescrip
rows_returned_(0),
has_write_iterator_(false),
write_page_(nullptr),
- write_page_reservation_(buffer_pool_client_),
bytes_pinned_(0),
num_rows_(0),
default_page_len_(default_page_len),
@@ -139,16 +137,6 @@ void BufferedTupleStream3::CheckConsistencyFast() const {
// flight and this would required blocking on that write.
DCHECK_GE(read_end_ptr_, read_ptr_);
}
- if (NeedReadReservation()) {
- DCHECK_EQ(default_page_len_, read_page_reservation_.GetReservation())
<< DebugString();
- } else if (!read_page_reservation_.is_closed()) {
- DCHECK_EQ(0, read_page_reservation_.GetReservation());
- }
- if (NeedWriteReservation()) {
- DCHECK_EQ(default_page_len_, write_page_reservation_.GetReservation());
- } else if (!write_page_reservation_.is_closed()) {
- DCHECK_EQ(0, write_page_reservation_.GetReservation());
- }
}
void BufferedTupleStream3::CheckPageConsistency(const Page* page) const {
@@ -172,19 +160,6 @@ string BufferedTupleStream3::DebugString() const {
} else {
ss << &*read_page_;
}
- ss << "\n"
- << " read_page_reservation=";
- if (read_page_reservation_.is_closed()) {
- ss << "<closed>";
- } else {
- ss << read_page_reservation_.GetReservation();
- }
- ss << " write_page_reservation=";
- if (write_page_reservation_.is_closed()) {
- ss << "<closed>";
- } else {
- ss << write_page_reservation_.GetReservation();
- }
ss << "\n # pages=" << num_pages_ << " pages=[\n";
for (const Page& page : pages_) {
ss << "{" << page.DebugString() << "}";
@@ -205,7 +180,7 @@ Status BufferedTupleStream3::Init(int node_id, bool pinned)
{
return Status::OK();
}
-Status BufferedTupleStream3::PrepareForWrite(bool* got_reservation) {
+Status BufferedTupleStream3::PrepareForWrite() {
// This must be the first iterator created.
DCHECK(pages_.empty());
DCHECK(!delete_on_read_);
@@ -213,16 +188,11 @@ Status BufferedTupleStream3::PrepareForWrite(bool*
got_reservation) {
DCHECK(!has_read_iterator());
CHECK_CONSISTENCY_FULL();
- *got_reservation =
buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
- if (!*got_reservation) return Status::OK();
has_write_iterator_ = true;
- // Save reservation for the write iterators.
- buffer_pool_client_->SaveReservation(&write_page_reservation_,
default_page_len_);
- CHECK_CONSISTENCY_FULL();
return Status::OK();
}
-Status BufferedTupleStream3::PrepareForReadWrite(bool delete_on_read, bool*
got_reservation) {
+Status BufferedTupleStream3::PrepareForReadWrite(bool delete_on_read) {
// This must be the first iterator created.
DCHECK(pages_.empty());
DCHECK(!delete_on_read_);
@@ -230,12 +200,7 @@ Status BufferedTupleStream3::PrepareForReadWrite(bool
delete_on_read, bool* got_
DCHECK(!has_read_iterator());
CHECK_CONSISTENCY_FULL();
- *got_reservation = buffer_pool_client_->IncreaseReservationToFit(2 *
default_page_len_);
- if (!*got_reservation) return Status::OK();
has_write_iterator_ = true;
- // Save reservation for both the read and write iterators.
- buffer_pool_client_->SaveReservation(&read_page_reservation_,
default_page_len_);
- buffer_pool_client_->SaveReservation(&write_page_reservation_,
default_page_len_);
RETURN_IF_ERROR(PrepareForReadInternal(delete_on_read));
return Status::OK();
}
@@ -254,8 +219,6 @@ void BufferedTupleStream3::Close(RowBatch* batch,
RowBatch::FlushMode flush) {
buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle);
}
}
- read_page_reservation_.Close();
- write_page_reservation_.Close();
pages_.clear();
num_pages_ = 0;
bytes_pinned_ = 0;
@@ -297,62 +260,6 @@ void BufferedTupleStream3::UnpinPageIfNeeded(Page* page,
bool stream_pinned) {
}
}
-bool BufferedTupleStream3::NeedWriteReservation() const {
- return NeedWriteReservation(pinned_);
-}
-
-bool BufferedTupleStream3::NeedWriteReservation(bool stream_pinned) const {
- return NeedWriteReservation(stream_pinned, num_pages_,
has_write_iterator(),
- write_page_ != nullptr, has_read_write_page());
-}
-
-bool BufferedTupleStream3::NeedWriteReservation(bool stream_pinned, int64_t
num_pages,
- bool has_write_iterator, bool
has_write_page,
- bool has_read_write_page) {
- if (!has_write_iterator) return false;
- // If the stream is empty the write reservation hasn't been used yet.
- if (num_pages == 0) return true;
- if (stream_pinned) {
- // Make sure we've saved the write reservation for the next page if
the only
- // page is a read/write page.
- return has_read_write_page && num_pages == 1;
- } else {
- // Make sure we've saved the write reservation if it's not being used
to pin
- // a page in the stream.
- return !has_write_page || has_read_write_page;
- }
-}
-
-bool BufferedTupleStream3::NeedReadReservation() const {
- return NeedReadReservation(pinned_);
-}
-
-bool BufferedTupleStream3::NeedReadReservation(bool stream_pinned) const {
- return NeedReadReservation(stream_pinned, num_pages_, has_read_iterator(),
- read_page_ != pages_.end());
-}
-
-bool BufferedTupleStream3::NeedReadReservation(bool stream_pinned, int64_t
num_pages,
- bool has_read_iterator, bool
has_read_page) const {
- return NeedReadReservation(stream_pinned, num_pages, has_read_iterator,
has_read_page,
- has_write_iterator(), write_page_ != nullptr);
-}
-
-bool BufferedTupleStream3::NeedReadReservation(bool stream_pinned, int64_t
num_pages,
- bool has_read_iterator, bool
has_read_page,
- bool has_write_iterator, bool
has_write_page) {
- if (!has_read_iterator) return false;
- if (stream_pinned) {
- // Need reservation if there are no pages currently pinned for reading
but we may add
- // a page.
- return num_pages == 0 && has_write_iterator;
- } else {
- // Only need to save reservation for an unpinned stream if there is no
read page
- // and we may advance to one in the future.
- return (has_write_iterator || num_pages > 0) && !has_read_page;
- }
-}
-
Status BufferedTupleStream3::NewWritePage(int64_t page_len) noexcept {
DCHECK(!closed_);
DCHECK(write_page_ == nullptr);
@@ -377,59 +284,19 @@ void BufferedTupleStream3::CalcPageLenForRow(int64_t
row_size, int64_t* page_len
*page_len = std::max(default_page_len_,
BitUtil::RoundUpToPowerOfTwo(row_size));
}
-Status BufferedTupleStream3::AdvanceWritePage(int64_t row_size, bool*
got_reservation) noexcept {
+Status BufferedTupleStream3::AdvanceWritePage(int64_t row_size) noexcept {
DCHECK(has_write_iterator());
CHECK_CONSISTENCY_FAST();
int64_t page_len;
CalcPageLenForRow(row_size, &page_len);
-
- // Reservation may have been saved for the next write page, e.g. by
PrepareForWrite()
- // if the stream is empty.
- int64_t write_reservation_to_restore = 0, read_reservation_to_restore = 0;
- if (NeedWriteReservation(pinned_, num_pages_, true, write_page_ != nullptr,
- has_read_write_page()) &&
- !NeedWriteReservation(pinned_, num_pages_ + 1, true, true, false)) {
- write_reservation_to_restore = default_page_len_;
- }
- // If the stream is pinned, we need to keep the previous write page pinned
for reading.
- // Check if we saved reservation for this case.
- if (NeedReadReservation(pinned_, num_pages_, has_read_iterator(),
read_page_ != pages_.end(),
- true, write_page_ != nullptr) &&
- !NeedReadReservation(pinned_, num_pages_ + 1, has_read_iterator(),
- read_page_ != pages_.end(), true, true)) {
- read_reservation_to_restore = default_page_len_;
- }
-
- // We may reclaim reservation by unpinning a page that was pinned for
writing.
- int64_t write_page_reservation_to_reclaim =
- (write_page_ != nullptr && !pinned_ && !has_read_write_page()) ?
write_page_->len() : 0;
- // Check to see if we can get the reservation before changing the state of
the stream.
- if (!buffer_pool_client_->IncreaseReservationToFit(page_len -
write_reservation_to_restore -
-
read_reservation_to_restore -
-
write_page_reservation_to_reclaim)) {
- DCHECK(pinned_ || page_len > default_page_len_)
- << "If the stream is unpinned, this should only fail for large
pages";
- CHECK_CONSISTENCY_FAST();
- *got_reservation = false;
- return Status::OK();
- }
- if (write_reservation_to_restore > 0) {
- buffer_pool_client_->RestoreReservation(&write_page_reservation_,
- write_reservation_to_restore);
- }
- if (read_reservation_to_restore > 0) {
- buffer_pool_client_->RestoreReservation(&read_page_reservation_,
- read_reservation_to_restore);
- }
ResetWritePage();
//RETURN_IF_ERROR(NewWritePage(page_len));
Status status = NewWritePage(page_len);
if (UNLIKELY(!status.ok())) {
return status;
}
- *got_reservation = true;
return Status::OK();
}
@@ -450,15 +317,6 @@ void BufferedTupleStream3::InvalidateWriteIterator() {
if (!has_write_iterator()) return;
ResetWritePage();
has_write_iterator_ = false;
- // No more pages will be appended to stream - do not need any write
reservation.
- write_page_reservation_.Close();
- // May not need a read reservation once the write iterator is invalidated.
- if (NeedReadReservation(pinned_, num_pages_, has_read_iterator(),
read_page_ != pages_.end(),
- true, write_page_ != nullptr) &&
- !NeedReadReservation(pinned_, num_pages_, has_read_iterator(),
read_page_ != pages_.end(),
- false, false)) {
- buffer_pool_client_->RestoreReservation(&read_page_reservation_,
default_page_len_);
- }
}
Status BufferedTupleStream3::NextReadPage() {
@@ -470,10 +328,6 @@ Status BufferedTupleStream3::NextReadPage() {
// No rows read yet - start reading at first page. If the stream is
unpinned, we can
// use the reservation saved in PrepareForReadWrite() to pin the first
page.
read_page_ = pages_.begin();
- if (NeedReadReservation(pinned_, num_pages_, true, false) &&
- !NeedReadReservation(pinned_, num_pages_, true, true)) {
- buffer_pool_client_->RestoreReservation(&read_page_reservation_,
default_page_len_);
- }
} else if (delete_on_read_) {
DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << "
" << DebugString();
DCHECK_NE(&*read_page_, write_page_);
@@ -494,18 +348,6 @@ Status BufferedTupleStream3::NextReadPage() {
return Status::OK();
}
- if (!pinned_ && read_page_->len() > default_page_len_ &&
- buffer_pool_client_->GetUnusedReservation() < read_page_->len()) {
- // If we are iterating over an unpinned stream and encounter a page
that is larger
- // than the default page length, then unpinning the previous page may
not have
- // freed up enough reservation to pin the next one. The client is
responsible for
- // ensuring the reservation is available, so this indicates a bug.
- std::stringstream err_stream;
- err_stream << "Internal error: couldn't pin large page of " <<
read_page_->len()
- << " bytes, client only had " <<
buffer_pool_client_->GetUnusedReservation()
- << " bytes of unused reservation:" <<
buffer_pool_client_->DebugString() << "\n";
- return Status::InternalError(err_stream.str());
- }
// Ensure the next page is pinned for reading. By this point we should
have enough
// reservation to pin the page. If the stream is pinned, the page is
already pinned.
// If the stream is unpinned, we freed up enough memory for a
default-sized page by
@@ -521,14 +363,6 @@ Status BufferedTupleStream3::NextReadPage() {
read_ptr_ = read_buffer->data();
read_end_ptr_ = read_ptr_ + read_buffer->len();
- // We may need to save reservation for the write page in the case when the
write page
- // became a read/write page.
- if (!NeedWriteReservation(pinned_, num_pages_, has_write_iterator(),
write_page_ != nullptr,
- false) &&
- NeedWriteReservation(pinned_, num_pages_, has_write_iterator(),
write_page_ != nullptr,
- has_read_write_page())) {
- buffer_pool_client_->SaveReservation(&write_page_reservation_,
default_page_len_);
- }
CHECK_CONSISTENCY_FAST();
return Status::OK();
}
@@ -545,22 +379,15 @@ void BufferedTupleStream3::InvalidateReadIterator() {
UnpinPageIfNeeded(prev_read_page, pinned_);
}
has_read_iterator_ = false;
- if (read_page_reservation_.GetReservation() > 0) {
- buffer_pool_client_->RestoreReservation(&read_page_reservation_,
default_page_len_);
- }
// It is safe to re-read a delete-on-read stream if no rows were read and
no pages
// were therefore deleted.
if (rows_returned_ == 0) delete_on_read_ = false;
}
-Status BufferedTupleStream3::PrepareForRead(bool delete_on_read, bool*
got_reservation) {
+Status BufferedTupleStream3::PrepareForRead(bool delete_on_read) {
CHECK_CONSISTENCY_FULL();
InvalidateWriteIterator();
InvalidateReadIterator();
- // If already pinned, no additional pin is needed (see ExpectedPinCount()).
- *got_reservation = pinned_ || pages_.empty() ||
-
buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
- if (!*got_reservation) return Status::OK();
return PrepareForReadInternal(delete_on_read);
}
@@ -603,28 +430,6 @@ Status BufferedTupleStream3::PinStream(bool* pinned) {
return Status::OK();
}
*pinned = false;
- // First, make sure we have the reservation to pin all the pages for
reading.
- int64_t bytes_to_pin = 0;
- for (Page& page : pages_) {
- bytes_to_pin += (ExpectedPinCount(true, &page) - page.pin_count()) *
page.len();
- }
-
- // Check if we have some reservation to restore.
- bool restore_write_reservation = NeedWriteReservation(false) &&
!NeedWriteReservation(true);
- bool restore_read_reservation = NeedReadReservation(false) &&
!NeedReadReservation(true);
- int64_t increase_needed = bytes_to_pin - (restore_write_reservation ?
default_page_len_ : 0) -
- (restore_read_reservation ? default_page_len_ :
0);
- bool reservation_granted =
buffer_pool_client_->IncreaseReservationToFit(increase_needed);
- if (!reservation_granted) return Status::OK();
-
- // If there is no current write page we should have some saved reservation
to use.
- // Only continue saving it if the stream is empty and need it to pin the
first page.
- if (restore_write_reservation) {
- buffer_pool_client_->RestoreReservation(&write_page_reservation_,
default_page_len_);
- }
- if (restore_read_reservation) {
- buffer_pool_client_->RestoreReservation(&read_page_reservation_,
default_page_len_);
- }
// At this point success is guaranteed - go through to pin the pages we
need to pin.
// If the page data was evicted from memory, the read I/O can happen in
parallel
@@ -652,13 +457,6 @@ void BufferedTupleStream3::UnpinStream(UnpinMode mode) {
// be unpinned at this point.
for (Page& page : pages_) UnpinPageIfNeeded(&page, false);
- // Check to see if we need to save some of the reservation we freed up.
- if (!NeedWriteReservation(true) && NeedWriteReservation(false)) {
- buffer_pool_client_->SaveReservation(&write_page_reservation_,
default_page_len_);
- }
- if (!NeedReadReservation(true) && NeedReadReservation(false)) {
- buffer_pool_client_->SaveReservation(&read_page_reservation_,
default_page_len_);
- }
pinned_ = false;
}
CHECK_CONSISTENCY_FULL();
@@ -674,9 +472,7 @@ Status
BufferedTupleStream3::GetRows(std::unique_ptr<RowBatch>* batch, bool* got
}
RETURN_IF_ERROR(PinStream(got_rows));
if (!*got_rows) return Status::OK();
- bool got_reservation;
- RETURN_IF_ERROR(PrepareForRead(false, &got_reservation));
- DCHECK(got_reservation) << "Stream was pinned";
+ RETURN_IF_ERROR(PrepareForRead(false));
// TODO chenhao
// capacity in RowBatch use int, but _num_rows is int64_t
@@ -886,9 +682,8 @@ bool BufferedTupleStream3::AddRowSlow(TupleRow* row,
Status* status) noexcept {
}
uint8_t* BufferedTupleStream3::AddRowCustomBeginSlow(int64_t size, Status*
status) noexcept {
- bool got_reservation = false;
- *status = AdvanceWritePage(size, &got_reservation);
- if (!status->ok() || !got_reservation) {
+ *status = AdvanceWritePage(size);
+ if (!status->ok()) {
return nullptr;
}
// We have a large-enough page so now success is guaranteed.
@@ -902,11 +697,6 @@ void BufferedTupleStream3::AddLargeRowCustomEnd(int64_t
size) noexcept {
// Immediately unpin the large write page so that we're not using up extra
reservation
// and so we don't append another row to the page.
ResetWritePage();
- // Save some of the reservation we freed up so we can create the next
write page when
- // needed.
- if (NeedWriteReservation()) {
- buffer_pool_client_->SaveReservation(&write_page_reservation_,
default_page_len_);
- }
// The stream should be in a consistent state once the row is added.
CHECK_CONSISTENCY_FAST();
}
diff --git a/be/src/runtime/buffered_tuple_stream3.h
b/be/src/runtime/buffered_tuple_stream3.h
index 9af6d21034..6f5ba3dae4 100644
--- a/be/src/runtime/buffered_tuple_stream3.h
+++ b/be/src/runtime/buffered_tuple_stream3.h
@@ -228,7 +228,7 @@ public:
/// 'got_reservation': set to true if there was enough reservation to
initialize the
/// first write page and false if there was not enough reservation and
no other
/// error was encountered. Undefined if an error status is returned.
- Status PrepareForWrite(bool* got_reservation) WARN_UNUSED_RESULT;
+ Status PrepareForWrite() WARN_UNUSED_RESULT;
/// Prepares the stream for interleaved reads and writes by saving enough
reservation
/// for default-sized read and write pages. Called after Init() and before
the first
@@ -237,7 +237,7 @@ public:
/// 'got_reservation': set to true if there was enough reservation to
initialize the
/// read and write pages and false if there was not enough reservation
and no other
/// error was encountered. Undefined if an error status is returned.
- Status PrepareForReadWrite(bool delete_on_read, bool* got_reservation)
WARN_UNUSED_RESULT;
+ Status PrepareForReadWrite(bool delete_on_read) WARN_UNUSED_RESULT;
/// Prepares the stream for reading, invalidating the write iterator (if
there is one).
/// Therefore must be called after the last AddRow() or AddRowCustomEnd()
and before
@@ -248,7 +248,7 @@ public:
/// 'got_reservation': set to true if there was enough reservation to
initialize the
/// first read page and false if there was not enough reservation and
no other
/// error was encountered. Undefined if an error status is returned.
- Status PrepareForRead(bool delete_on_read, bool* got_reservation)
WARN_UNUSED_RESULT;
+ Status PrepareForRead(bool delete_on_read) WARN_UNUSED_RESULT;
/// Adds a single row to the stream. There are three possible outcomes:
/// a) The append succeeds. True is returned.
@@ -447,11 +447,6 @@ private:
/// status was returned.
std::list<Page>::iterator read_page_;
- /// Saved reservation for read iterator. 'default_page_len_' reservation
is saved if
- /// there is a read iterator, no pinned read page, and the possibility
that the read
- /// iterator will advance to a valid page.
- BufferPool::SubReservation read_page_reservation_;
-
/// Number of rows returned from the current read_page_.
uint32_t read_page_rows_returned_;
@@ -479,15 +474,6 @@ private:
/// appending a larger row between AddRowCustomBegin() and
AddRowCustomEnd().
Page* write_page_;
- /// Saved reservation for write iterator. 'default_page_len_' reservation
is saved if
- /// there is a write iterator, no page currently pinned for writing and
the possibility
- /// that a pin count will be needed for the write iterator in future.
Specifically if:
- /// * no rows have been appended to the stream and 'pages_' is empty, or
- /// * the stream is unpinned, 'write_page_' is null and and the last page
in 'pages_'
- /// is a large page that we advanced past, or
- /// * there is only one pinned page in the stream and it is already pinned
for reading.
- BufferPool::SubReservation write_page_reservation_;
-
/// Total bytes of pinned pages in pages_, stored to avoid iterating over
the list
/// to compute it.
int64_t bytes_pinned_;
@@ -570,7 +556,7 @@ private:
/// allocated. Returns an error if the row cannot fit in a page. Returns
OK and sets
/// 'got_reservation' to false if the reservation could not be increased
and no other
/// error was encountered.
- Status AdvanceWritePage(int64_t row_size, bool* got_reservation) noexcept
WARN_UNUSED_RESULT;
+ Status AdvanceWritePage(int64_t row_size) noexcept WARN_UNUSED_RESULT;
/// Reset the write page, if there is one, and unpin pages accordingly. If
there
/// is an active write iterator, the next row will be appended to a new
page.
@@ -618,36 +604,6 @@ private:
/// read and write pages and whether the stream is pinned.
int ExpectedPinCount(bool stream_pinned, const Page* page) const;
- /// Return true if the stream in its current state needs to have a
reservation for
- /// a write page stored in 'write_page_reservation_'.
- bool NeedWriteReservation() const;
-
- /// Same as above, except assume the stream's 'pinned_' state is
'stream_pinned'.
- bool NeedWriteReservation(bool stream_pinned) const;
-
- /// Same as above, except assume the stream has 'num_pages' pages and
different
- /// iterator state.
- static bool NeedWriteReservation(bool stream_pinned, int64_t num_pages,
bool has_write_iterator,
- bool has_write_page, bool
has_read_write_page);
-
- /// Return true if the stream in its current state needs to have a
reservation for
- /// a read page stored in 'read_page_reservation_'.
- bool NeedReadReservation() const;
-
- /// Same as above, except assume the stream's 'pinned_' state is
'stream_pinned'.
- bool NeedReadReservation(bool stream_pinned) const;
-
- /// Same as above, except assume the stream has 'num_pages' pages and a
different
- /// read iterator state.
- bool NeedReadReservation(bool stream_pinned, int64_t num_pages, bool
has_read_iterator,
- bool has_read_page) const;
-
- /// Same as above, except assume the stream has 'num_pages' pages and a
different
- /// write iterator state.
- static bool NeedReadReservation(bool stream_pinned, int64_t num_pages,
bool has_read_iterator,
- bool has_read_page, bool
has_write_iterator,
- bool has_write_page);
-
/// Templated GetNext implementations.
template <bool FILL_FLAT_ROWS>
Status GetNextInternal(RowBatch* batch, bool* eos,
std::vector<FlatRowPtr>* flat_rows);
diff --git a/be/src/runtime/bufferpool/buffer_pool.cc
b/be/src/runtime/bufferpool/buffer_pool.cc
index eda74e3530..26c43f2b9e 100644
--- a/be/src/runtime/bufferpool/buffer_pool.cc
+++ b/be/src/runtime/bufferpool/buffer_pool.cc
@@ -114,13 +114,11 @@ BufferPool::BufferPool(int64_t min_buffer_len, int64_t
buffer_bytes_limit,
BufferPool::~BufferPool() {}
-Status BufferPool::RegisterClient(const string& name, ReservationTracker*
parent_reservation,
- int64_t reservation_limit, RuntimeProfile*
profile,
+Status BufferPool::RegisterClient(const string& name, RuntimeProfile* profile,
ClientHandle* client) {
DCHECK(!client->is_registered());
- DCHECK(parent_reservation != nullptr);
client->impl_ = new Client(this, //file_group,
- name, parent_reservation, reservation_limit,
profile);
+ name, profile);
return Status::OK();
}
@@ -179,7 +177,6 @@ Status BufferPool::Pin(ClientHandle* client, PageHandle*
handle) {
}
// Update accounting last to avoid complicating the error return path
above.
++page->pin_count;
- client->impl_->reservation()->AllocateFrom(page->len);
return Status::OK();
}
@@ -190,8 +187,6 @@ void BufferPool::Unpin(ClientHandle* client, PageHandle*
handle) {
// If handle is pinned, we can assume that the page itself is pinned.
DCHECK(handle->is_pinned());
Page* page = handle->page_;
- ReservationTracker* reservation = client->impl_->reservation();
- reservation->ReleaseTo(page->len);
if (--page->pin_count > 0) return;
//if (page->pin_in_flight) {
@@ -249,8 +244,6 @@ Status BufferPool::TransferBuffer(ClientHandle* src_client,
BufferHandle* src,
DCHECK_NE(src, dst);
DCHECK_NE(src_client, dst_client);
- dst_client->impl_->reservation()->AllocateFrom(src->len());
- src_client->impl_->reservation()->ReleaseTo(src->len());
*dst = std::move(*src);
dst->client_ = dst_client;
return Status::OK();
@@ -292,82 +285,12 @@ int64_t BufferPool::GetFreeBufferBytes() const {
return allocator_->GetFreeBufferBytes();
}
-bool BufferPool::ClientHandle::IncreaseReservation(int64_t bytes) {
- return impl_->reservation()->IncreaseReservation(bytes);
-}
-
-bool BufferPool::ClientHandle::IncreaseReservationToFit(int64_t bytes) {
- return impl_->reservation()->IncreaseReservationToFit(bytes);
-}
-
-Status BufferPool::ClientHandle::DecreaseReservationTo(int64_t target_bytes) {
- return impl_->DecreaseReservationTo(target_bytes);
-}
-
-int64_t BufferPool::ClientHandle::GetReservation() const {
- return impl_->reservation()->GetReservation();
-}
-
-int64_t BufferPool::ClientHandle::GetUsedReservation() const {
- return impl_->reservation()->GetUsedReservation();
-}
-
-int64_t BufferPool::ClientHandle::GetUnusedReservation() const {
- return impl_->reservation()->GetUnusedReservation();
-}
-
-bool BufferPool::ClientHandle::TransferReservationFrom(ReservationTracker*
src, int64_t bytes) {
- return src->TransferReservationTo(impl_->reservation(), bytes);
-}
-
-bool BufferPool::ClientHandle::TransferReservationTo(ReservationTracker* dst,
int64_t bytes) {
- return impl_->reservation()->TransferReservationTo(dst, bytes);
-}
-
-void BufferPool::ClientHandle::SaveReservation(SubReservation* dst, int64_t
bytes) {
- DCHECK_EQ(dst->tracker_->parent(), impl_->reservation());
- bool success =
impl_->reservation()->TransferReservationTo(dst->tracker_.get(), bytes);
- DCHECK(success); // SubReservation should not have a limit, so this
shouldn't fail.
-}
-
-void BufferPool::ClientHandle::RestoreReservation(SubReservation* src, int64_t
bytes) {
- DCHECK_EQ(src->tracker_->parent(), impl_->reservation());
- bool success = src->tracker_->TransferReservationTo(impl_->reservation(),
bytes);
- DCHECK(success); // Transferring reservation to parent shouldn't fail.
-}
-
-void BufferPool::ClientHandle::SetDebugDenyIncreaseReservation(double
probability) {
- impl_->reservation()->SetDebugDenyIncreaseReservation(probability);
-}
-
bool BufferPool::ClientHandle::has_unpinned_pages() const {
return impl_->has_unpinned_pages();
}
-BufferPool::SubReservation::SubReservation(ClientHandle* client) {
- tracker_.reset(new ReservationTracker);
- tracker_->InitChildTracker(nullptr, client->impl_->reservation(),
- numeric_limits<int64_t>::max());
-}
-
-BufferPool::SubReservation::~SubReservation() {}
-
-int64_t BufferPool::SubReservation::GetReservation() const {
- return tracker_->GetReservation();
-}
-
-void BufferPool::SubReservation::Close() {
- // Give any reservation back to the client.
- if (is_closed()) return;
- bool success = tracker_->TransferReservationTo(tracker_->parent(),
tracker_->GetReservation());
- DCHECK(success); // Transferring reservation to parent shouldn't fail.
- tracker_->Close();
- tracker_.reset();
-}
-
BufferPool::Client::Client(BufferPool* pool, //TmpFileMgr::FileGroup*
file_group,
- const string& name, ReservationTracker*
parent_reservation,
- int64_t reservation_limit, RuntimeProfile* profile)
+ const string& name, RuntimeProfile* profile)
: pool_(pool),
//file_group_(file_group),
name_(name),
@@ -376,7 +299,6 @@ BufferPool::Client::Client(BufferPool* pool,
//TmpFileMgr::FileGroup* file_group
buffers_allocated_bytes_(0) {
// Set up a child profile with buffer pool info.
RuntimeProfile* child_profile = profile->create_child("Buffer pool", true,
true);
- reservation_.InitChildTracker(child_profile, parent_reservation,
reservation_limit);
counters_.alloc_time = ADD_TIMER(child_profile, "AllocTime");
counters_.cumulative_allocations =
ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT);
@@ -544,25 +466,11 @@ Status
BufferPool::Client::PrepareToAllocateBuffer(int64_t len) {
// Clean enough pages to allow allocation to proceed without violating our
eviction
// policy. This can fail, so only update the accounting once success is
ensured.
//RETURN_IF_ERROR(CleanPages(&lock, len));
- reservation_.AllocateFrom(len);
buffers_allocated_bytes_ += len;
DCHECK_CONSISTENCY();
return Status::OK();
}
-Status BufferPool::Client::DecreaseReservationTo(int64_t target_bytes) {
- std::unique_lock<std::mutex> lock(lock_);
- int64_t current_reservation = reservation_.GetReservation();
- DCHECK_GE(current_reservation, target_bytes);
- int64_t amount_to_free =
- std::min(reservation_.GetUnusedReservation(), current_reservation
- target_bytes);
- if (amount_to_free == 0) return Status::OK();
- // Clean enough pages to allow us to safely release reservation.
- //RETURN_IF_ERROR(CleanPages(&lock, amount_to_free));
- reservation_.DecreaseReservation(amount_to_free);
- return Status::OK();
-}
-
Status BufferPool::Client::CleanPages(std::unique_lock<std::mutex>*
client_lock, int64_t len) {
DCheckHoldsLock(*client_lock);
DCHECK_CONSISTENCY();
@@ -693,8 +601,7 @@ string BufferPool::Client::DebugString() {
<< buffers_allocated_bytes_ << " num_pages: " << num_pages_
<< " pinned_bytes: " << pinned_pages_.bytes()
<< " dirty_unpinned_bytes: " << dirty_unpinned_pages_.bytes()
- << " in_flight_write_bytes: " << in_flight_write_pages_.bytes()
- << " reservation: " << reservation_.DebugString();
+ << " in_flight_write_bytes: " << in_flight_write_pages_.bytes();
ss << "\n " << pinned_pages_.size() << " pinned pages: ";
pinned_pages_.iterate(std::bind<bool>(Page::DebugStringCallback, &ss,
std::placeholders::_1));
ss << "\n " << dirty_unpinned_pages_.size() << " dirty unpinned pages: ";
diff --git a/be/src/runtime/bufferpool/buffer_pool.h
b/be/src/runtime/bufferpool/buffer_pool.h
index 9a378934a2..469f5071db 100644
--- a/be/src/runtime/bufferpool/buffer_pool.h
+++ b/be/src/runtime/bufferpool/buffer_pool.h
@@ -34,7 +34,6 @@
namespace doris {
-class ReservationTracker;
class RuntimeProfile;
class SystemAllocator;
class MemTracker;
@@ -149,7 +148,6 @@ public:
class BufferHandle;
class ClientHandle;
class PageHandle;
- class SubReservation;
/// Constructs a new buffer pool.
/// 'min_buffer_len': the minimum buffer length for the pool. Must be a
power of two.
/// 'buffer_bytes_limit': the maximum physical memory in bytes that can be
used by the
@@ -167,12 +165,7 @@ public:
/// not allowed for this client. Counters for this client are added to the
(non-nullptr)
/// 'profile'. 'client' is the client to register. 'client' must not
already be
/// registered.
- ///
- /// The client's reservation is created as a child of 'parent_reservation'
with limit
- /// 'reservation_limit' and associated with MemTracker 'mem_tracker'. The
initial
- /// reservation is 0 bytes.
- Status RegisterClient(const std::string& name, ReservationTracker*
parent_reservation,
- int64_t reservation_limit, RuntimeProfile* profile,
+ Status RegisterClient(const std::string& name, RuntimeProfile* profile,
ClientHandle* client) WARN_UNUSED_RESULT;
/// Deregister 'client' if it is registered. All pages must be destroyed
and buffers
@@ -315,49 +308,6 @@ public:
/// Client must be deregistered.
~ClientHandle() { DCHECK(!is_registered()); }
- /// Request to increase reservation for this client by 'bytes' by calling
- /// ReservationTracker::IncreaseReservation(). Returns true if the
reservation was
- /// successfully increased.
- bool IncreaseReservation(int64_t bytes) WARN_UNUSED_RESULT;
-
- /// Tries to ensure that 'bytes' of unused reservation is available for
this client
- /// to use by calling ReservationTracker::IncreaseReservationToFit().
Returns true
- /// if successful, after which 'bytes' can be used.
- bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT;
-
- /// Try to decrease this client's reservation down to a minimum of
'target_bytes' by
- /// releasing unused reservation to ancestor ReservationTrackers, all the
way up to
- /// the root of the ReservationTracker tree. May block waiting for
unpinned pages to
- /// be flushed. This client's reservation must be at least 'target_bytes'
before
- /// calling this method. May fail if decreasing the reservation requires
flushing
- /// unpinned pages to disk and a write to disk fails.
- Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT;
-
- /// Move some of this client's reservation to the SubReservation. 'bytes'
of unused
- /// reservation must be available in this tracker.
- void SaveReservation(SubReservation* dst, int64_t bytes);
-
- /// Move some of src's reservation to this client. 'bytes' of unused
reservation must be
- /// available in 'src'.
- void RestoreReservation(SubReservation* src, int64_t bytes);
-
- /// Accessors for this client's reservation corresponding to the
identically-named
- /// methods in ReservationTracker.
- int64_t GetReservation() const;
- int64_t GetUsedReservation() const;
- int64_t GetUnusedReservation() const;
-
- /// Try to transfer 'bytes' of reservation from 'src' to this client using
- /// ReservationTracker::TransferReservationTo().
- bool TransferReservationFrom(ReservationTracker* src, int64_t bytes);
-
- /// Transfer 'bytes' of reservation from this client to 'dst' using
- /// ReservationTracker::TransferReservationTo().
- bool TransferReservationTo(ReservationTracker* dst, int64_t bytes);
-
- /// Call SetDebugDenyIncreaseReservation() on this client's
ReservationTracker.
- void SetDebugDenyIncreaseReservation(double probability);
-
bool is_registered() const { return impl_ != nullptr; }
/// Return true if there are any unpinned pages for this client.
@@ -368,7 +318,6 @@ public:
private:
friend class BufferPool;
friend class BufferPoolTest;
- friend class SubReservation;
DISALLOW_COPY_AND_ASSIGN(ClientHandle);
/// Internal state for the client. nullptr means the client isn't
registered.
@@ -376,31 +325,6 @@ private:
Client* impl_;
};
-/// Helper class that allows dividing up a client's reservation into separate
buckets.
-class BufferPool::SubReservation {
-public:
- SubReservation(ClientHandle* client);
- ~SubReservation();
-
- /// Returns the amount of reservation stored in this sub-reservation.
- int64_t GetReservation() const;
-
- /// Releases the sub-reservation to the client's tracker. Must be called
before
- /// destruction.
- void Close();
-
- bool is_closed() const { return tracker_ == nullptr; }
-
-private:
- friend class BufferPool::ClientHandle;
- DISALLOW_COPY_AND_ASSIGN(SubReservation);
-
- /// Child of the client's tracker used to track the sub-reservation. Usage
is not
- /// tracked against this tracker - instead the reservation is always
transferred back
- /// to the client's tracker before use.
- std::unique_ptr<ReservationTracker> tracker_;
-};
-
/// A handle to a buffer allocated from the buffer pool. Each BufferHandle
should only
/// be used by a single thread at a time: concurrently calling BufferHandle
methods or
/// BufferPool methods with the BufferHandle as an argument is not supported.
diff --git a/be/src/runtime/bufferpool/buffer_pool_internal.h
b/be/src/runtime/bufferpool/buffer_pool_internal.h
index 08549cc947..2b0a083268 100644
--- a/be/src/runtime/bufferpool/buffer_pool_internal.h
+++ b/be/src/runtime/bufferpool/buffer_pool_internal.h
@@ -23,7 +23,6 @@
#include "runtime/bufferpool/buffer_pool.h"
#include "runtime/bufferpool/buffer_pool_counters.h"
-#include "runtime/bufferpool/reservation_tracker.h"
// Ensure that DCheckConsistency() function calls get removed in release
builds.
#ifndef NDEBUG
@@ -132,16 +131,14 @@ private:
class BufferPool::Client {
public:
Client(BufferPool* pool, //TmpFileMgr::FileGroup* file_group,
- const std::string& name, ReservationTracker* parent_reservation,
- int64_t reservation_limit, RuntimeProfile* profile);
+ const std::string& name, RuntimeProfile* profile);
~Client() {
DCHECK_EQ(0, num_pages_);
DCHECK_EQ(0, buffers_allocated_bytes_);
}
- /// Release reservation for this client.
- void Close() { reservation_.Close(); }
+ void Close() {}
/// Create a pinned page using 'buffer', which was allocated using
AllocateBuffer().
/// No client or page locks should be held by the caller.
@@ -181,15 +178,11 @@ public:
/// client locks should be held by the caller.
Status PrepareToAllocateBuffer(int64_t len) WARN_UNUSED_RESULT;
- /// Implementation of ClientHandle::DecreaseReservationTo().
- Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT;
-
/// Called after a buffer of 'len' is freed via the FreeBuffer() API to
update
/// internal accounting and release the buffer to the client's
reservation. No page or
/// client locks should be held by the caller.
void FreedBuffer(int64_t len) {
std::lock_guard<std::mutex> cl(lock_);
- reservation_.ReleaseTo(len);
buffers_allocated_bytes_ -= len;
DCHECK_CONSISTENCY();
}
@@ -208,7 +201,6 @@ public:
DCHECK(client_lock.mutex() == &lock_ && client_lock.owns_lock());
}
- ReservationTracker* reservation() { return &reservation_; }
const BufferPoolClientCounters& counters() const { return counters_; }
//bool spilling_enabled() const { return file_group_ != nullptr; }
void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; }
@@ -230,10 +222,6 @@ private:
DCHECK_LE(
pinned_pages_.size() + dirty_unpinned_pages_.size() +
in_flight_write_pages_.size(),
num_pages_);
- // Check that we flushed enough pages to disk given our eviction
policy.
- DCHECK_GE(reservation_.GetReservation(), buffers_allocated_bytes_ +
pinned_pages_.bytes() +
-
dirty_unpinned_pages_.bytes() +
-
in_flight_write_pages_.bytes());
}
/// Must be called once before allocating or reclaiming a buffer of 'len'.
Ensures that
@@ -269,10 +257,6 @@ private:
/// A name identifying the client.
const std::string name_;
- /// The reservation tracker for the client. All pages pinned by the client
count as
- /// usage against 'reservation_'.
- ReservationTracker reservation_;
-
/// The RuntimeProfile counters for this client, owned by the client's
RuntimeProfile.
/// All non-nullptr.
BufferPoolClientCounters counters_;
diff --git a/be/src/runtime/bufferpool/reservation_tracker.cc
b/be/src/runtime/bufferpool/reservation_tracker.cc
deleted file mode 100644
index 6985edaef7..0000000000
--- a/be/src/runtime/bufferpool/reservation_tracker.cc
+++ /dev/null
@@ -1,401 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/bufferpool/reservation_tracker.h"
-
-#include <algorithm>
-#include <cstdlib>
-
-#include "common/object_pool.h"
-#include "gutil/strings/substitute.h"
-#include "olap/utils.h"
-#include "runtime/memory/mem_tracker_limiter.h"
-#include "runtime/thread_context.h"
-#include "util/dummy_runtime_profile.h"
-#include "util/runtime_profile.h"
-
-namespace doris {
-
-ReservationTracker::ReservationTracker() {}
-
-ReservationTracker::~ReservationTracker() {
- DCHECK(!initialized_);
-}
-
-void ReservationTracker::InitRootTracker(RuntimeProfile* profile, int64_t
reservation_limit) {
- std::lock_guard<SpinLock> l(lock_);
- DCHECK(!initialized_);
- parent_ = nullptr;
- mem_tracker_ = nullptr;
- reservation_limit_ = reservation_limit;
- reservation_ = 0;
- used_reservation_ = 0;
- child_reservations_ = 0;
- initialized_ = true;
-
- InitCounters(profile, reservation_limit_);
- COUNTER_SET(counters_.peak_reservation, reservation_);
-
- CheckConsistency();
-}
-
-void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
ReservationTracker* parent,
- int64_t reservation_limit) {
- DCHECK(parent != nullptr);
- DCHECK_GE(reservation_limit, 0);
-
- std::lock_guard<SpinLock> l(lock_);
- DCHECK(!initialized_);
- parent_ = parent;
- mem_tracker_ = nullptr; // TODO(zxy) remove ReservationTracker later
-
- reservation_limit_ = reservation_limit;
- reservation_ = 0;
- used_reservation_ = 0;
- child_reservations_ = 0;
- initialized_ = true;
-
- if (mem_tracker_ != nullptr) {
- MemTracker* parent_mem_tracker = GetParentMemTracker();
- if (parent_mem_tracker != nullptr) {
- // Make sure the parent links of the MemTrackers correspond to our
parent links.
- // DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
- } else {
- // Make sure we didn't leave a gap in the links. E.g. this
tracker's grandparent
- // shouldn't have a MemTracker.
- ReservationTracker* ancestor = parent_;
- while (ancestor != nullptr) {
- DCHECK(ancestor->mem_tracker_ == nullptr);
- ancestor = ancestor->parent_;
- }
- }
- }
-
- InitCounters(profile, reservation_limit_);
-
- CheckConsistency();
-}
-
-void ReservationTracker::InitCounters(RuntimeProfile* profile, int64_t
reservation_limit) {
- if (profile == nullptr) {
- dummy_profile_.reset(new DummyProfile);
- profile = dummy_profile_->profile();
- }
-
- // Check that another tracker's counters aren't already registered in the
profile.
- DCHECK(profile->get_counter("PeakReservation") == nullptr);
- counters_.peak_reservation =
profile->AddHighWaterMarkCounter("PeakReservation", TUnit::BYTES);
- counters_.peak_used_reservation =
- profile->AddHighWaterMarkCounter("PeakUsedReservation",
TUnit::BYTES);
- // Only show the limit if set.
- counters_.reservation_limit = nullptr;
- if (reservation_limit != numeric_limits<int64_t>::max()) {
- counters_.reservation_limit = ADD_COUNTER(profile, "ReservationLimit",
TUnit::BYTES);
- COUNTER_SET(counters_.reservation_limit, reservation_limit);
- }
-}
-
-void ReservationTracker::Close() {
- std::lock_guard<SpinLock> l(lock_);
- if (!initialized_) return;
- CheckConsistency();
- DCHECK_EQ(used_reservation_, 0);
- DCHECK_EQ(child_reservations_, 0);
- // Release any reservation to parent.
- if (parent_ != nullptr) DecreaseReservationLocked(reservation_, false);
- mem_tracker_ = nullptr;
- parent_ = nullptr;
- initialized_ = false;
-}
-
-bool ReservationTracker::IncreaseReservation(int64_t bytes) {
- std::lock_guard<SpinLock> l(lock_);
- return IncreaseReservationInternalLocked(bytes, false, false);
-}
-
-bool ReservationTracker::IncreaseReservationToFit(int64_t bytes) {
- std::lock_guard<SpinLock> l(lock_);
- return IncreaseReservationInternalLocked(bytes, true, false);
-}
-
-bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
- bool
use_existing_reservation,
- bool
is_child_reservation) {
- DCHECK(initialized_);
- int64_t reservation_increase =
- use_existing_reservation ? std::max<int64_t>(0, bytes -
unused_reservation()) : bytes;
- DCHECK_GE(reservation_increase, 0);
-
- bool granted;
- // Check if the increase is allowed, starting at the bottom of hierarchy.
- if (reservation_increase == 0) {
- granted = true;
- } else if (increase_deny_probability_ != 0.0 &&
- rand() < increase_deny_probability_ * (RAND_MAX + 1L)) {
- // Randomly deny reservation if requested. Use rand() to avoid needing
to set up a RNG.
- // Should be good enough. If the probability is 0.0, this never
triggers. If it is 1.0
- // it always triggers.
- granted = false;
- } else if (reservation_ + reservation_increase > reservation_limit_) {
- granted = false;
- } else {
- if (parent_ == nullptr) {
- granted = true;
- } else {
- std::lock_guard<SpinLock> l(parent_->lock_);
- granted =
parent_->IncreaseReservationInternalLocked(reservation_increase, true, true);
- }
- if (granted && !TryConsumeFromMemTracker(reservation_increase)) {
- granted = false;
- // Roll back changes to ancestors if MemTracker update fails.
- parent_->DecreaseReservation(reservation_increase, true);
- }
- }
-
- if (granted) {
- // The reservation was granted and state updated in all ancestors: we
can modify
- // this tracker's state now.
- UpdateReservation(reservation_increase);
- if (is_child_reservation) child_reservations_ += bytes;
- }
-
- CheckConsistency();
- return granted;
-}
-
-bool ReservationTracker::TryConsumeFromMemTracker(int64_t
reservation_increase) {
- DCHECK_GE(reservation_increase, 0);
- if (mem_tracker_ == nullptr) return true;
- if (GetParentMemTracker() == nullptr) {
- // At the topmost link, which may be a MemTracker with a limit, we
need to use
- // TryConsume() to check the limit.
- Status st =
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
- reservation_increase);
- WARN_IF_ERROR(st, "TryConsumeFromMemTracker failed");
- mem_tracker_->consume(reservation_increase);
- return st.ok();
- } else {
- // For lower links, there shouldn't be a limit to enforce, so we just
need to
- // update the consumption of the linked MemTracker since the
reservation is
- // already reflected in its parent.
- mem_tracker_->consume(reservation_increase);
- return true;
- }
-}
-
-void ReservationTracker::ReleaseToMemTracker(int64_t reservation_decrease) {
- DCHECK_GE(reservation_decrease, 0);
- if (mem_tracker_ == nullptr) return;
- mem_tracker_->release(reservation_decrease);
-}
-
-void ReservationTracker::DecreaseReservation(int64_t bytes, bool
is_child_reservation) {
- std::lock_guard<SpinLock> l(lock_);
- DecreaseReservationLocked(bytes, is_child_reservation);
-}
-
-void ReservationTracker::DecreaseReservationLocked(int64_t bytes, bool
is_child_reservation) {
- DCHECK(initialized_);
- DCHECK_GE(reservation_, bytes);
- if (bytes == 0) return;
- if (is_child_reservation) child_reservations_ -= bytes;
- UpdateReservation(-bytes);
- ReleaseToMemTracker(bytes);
- // The reservation should be returned up the tree.
- if (parent_ != nullptr) parent_->DecreaseReservation(bytes, true);
- CheckConsistency();
-}
-
-bool ReservationTracker::TransferReservationTo(ReservationTracker* other,
int64_t bytes) {
- if (other == this) return true;
- // Find the path to the root from both. The root is guaranteed to be a
common ancestor.
- std::vector<ReservationTracker*> path_to_common = FindPathToRoot();
- std::vector<ReservationTracker*> other_path_to_common =
other->FindPathToRoot();
- DCHECK_EQ(path_to_common.back(), other_path_to_common.back());
- ReservationTracker* common_ancestor = path_to_common.back();
- // Remove any common ancestors - they do not need to be updated for this
transfer.
- while (!path_to_common.empty() && !other_path_to_common.empty() &&
- path_to_common.back() == other_path_to_common.back()) {
- common_ancestor = path_to_common.back();
- path_to_common.pop_back();
- other_path_to_common.pop_back();
- }
-
- // At this point, we have three cases:
- // 1. 'common_ancestor' == 'other'. 'other_path_to_common' is empty
because 'other' is
- // the lowest common ancestor. To transfer, we decrease the reservation
on the
- // trackers under 'other', down to 'this'.
- // 2. 'common_ancestor' == 'this'. 'path_to_common' is empty because
'this' is the
- // lowest common ancestor. To transfer, we increase the reservation on
the trackers
- // under 'this', down to 'other'.
- // 3. Neither is an ancestor of the other. Both 'other_path_to_common' and
- // 'path_to_common' are non-empty. We increase the reservation on
trackers from
- // 'other' up to one below the common ancestor (checking limits as
needed) and if
- // successful, decrease reservations on trackers from 'this' up to one
below the
- // common ancestor.
-
- // Lock all of the trackers so we can do the update atomically. Need to be
careful to
- // lock subtrees in the correct order.
- std::vector<std::unique_lock<SpinLock>> locks;
- bool lock_first =
- path_to_common.empty() || other_path_to_common.empty() ||
- lock_sibling_subtree_first(path_to_common.back(),
other_path_to_common.back());
- if (lock_first) {
- for (ReservationTracker* tracker : path_to_common)
locks.emplace_back(tracker->lock_);
- }
- for (ReservationTracker* tracker : other_path_to_common) {
- locks.emplace_back(tracker->lock_);
- }
- if (!lock_first) {
- for (ReservationTracker* tracker : path_to_common)
locks.emplace_back(tracker->lock_);
- }
-
- // Check reservation limits will not be violated before applying any
updates.
- for (ReservationTracker* tracker : other_path_to_common) {
- if (tracker->reservation_ + bytes > tracker->reservation_limit_)
return false;
- }
-
- // Do the updates now that we have checked the limits. We're holding all
the locks
- // so this is all atomic.
- for (ReservationTracker* tracker : other_path_to_common) {
- tracker->UpdateReservation(bytes);
- bool success = tracker->TryConsumeFromMemTracker(bytes);
- DCHECK(success);
- if (tracker != other_path_to_common[0]) tracker->child_reservations_
+= bytes;
- }
-
- for (ReservationTracker* tracker : path_to_common) {
- if (tracker != path_to_common[0]) tracker->child_reservations_ -=
bytes;
- tracker->UpdateReservation(-bytes);
- tracker->ReleaseToMemTracker(bytes);
- }
-
- // Update the 'child_reservations_' on the common ancestor if needed.
- // Case 1: reservation was pushed up to 'other'.
- if (common_ancestor == other) {
- std::lock_guard<SpinLock> l(other->lock_);
- other->child_reservations_ -= bytes;
- other->CheckConsistency();
- }
- // Case 2: reservation was pushed down below 'this'.
- if (common_ancestor == this) {
- std::lock_guard<SpinLock> l(lock_);
- child_reservations_ += bytes;
- CheckConsistency();
- }
- return true;
-}
-
-std::vector<ReservationTracker*> ReservationTracker::FindPathToRoot() {
- std::vector<ReservationTracker*> path_to_root;
- ReservationTracker* curr = this;
- do {
- path_to_root.push_back(curr);
- curr = curr->parent_;
- } while (curr != nullptr);
- return path_to_root;
-}
-
-void ReservationTracker::AllocateFrom(int64_t bytes) {
- std::lock_guard<SpinLock> l(lock_);
- DCHECK(initialized_);
- DCHECK_GE(bytes, 0);
- DCHECK_LE(bytes, unused_reservation());
- UpdateUsedReservation(bytes);
- CheckConsistency();
-}
-
-void ReservationTracker::ReleaseTo(int64_t bytes) {
- std::lock_guard<SpinLock> l(lock_);
- DCHECK(initialized_);
- DCHECK_GE(bytes, 0);
- DCHECK_LE(bytes, used_reservation_);
- UpdateUsedReservation(-bytes);
- CheckConsistency();
-}
-
-int64_t ReservationTracker::GetReservation() {
- std::lock_guard<SpinLock> l(lock_);
- DCHECK(initialized_);
- return reservation_;
-}
-
-int64_t ReservationTracker::GetUsedReservation() {
- std::lock_guard<SpinLock> l(lock_);
- DCHECK(initialized_);
- return used_reservation_;
-}
-
-int64_t ReservationTracker::GetUnusedReservation() {
- std::lock_guard<SpinLock> l(lock_);
- DCHECK(initialized_);
- return unused_reservation();
-}
-
-int64_t ReservationTracker::GetChildReservations() {
- std::lock_guard<SpinLock> l(lock_);
- DCHECK(initialized_);
- return child_reservations_;
-}
-
-void ReservationTracker::CheckConsistency() const {
- // Check internal invariants.
- DCHECK_GE(reservation_, 0);
- DCHECK_LE(reservation_, reservation_limit_);
- DCHECK_GE(child_reservations_, 0);
- DCHECK_GE(used_reservation_, 0);
- DCHECK_LE(used_reservation_ + child_reservations_, reservation_);
-
- DCHECK_EQ(reservation_, counters_.peak_reservation->current_value());
- DCHECK_LE(reservation_, counters_.peak_reservation->value());
- DCHECK_EQ(used_reservation_,
counters_.peak_used_reservation->current_value());
- DCHECK_LE(used_reservation_, counters_.peak_used_reservation->value());
- if (counters_.reservation_limit != nullptr) {
- DCHECK_EQ(reservation_limit_, counters_.reservation_limit->value());
- }
-}
-
-void ReservationTracker::UpdateUsedReservation(int64_t delta) {
- used_reservation_ += delta;
- COUNTER_SET(counters_.peak_used_reservation, used_reservation_);
- VLOG_QUERY << "peak:" << counters_.peak_reservation->current_value()
- << " used reservation:" << reservation_;
- CheckConsistency();
-}
-
-void ReservationTracker::UpdateReservation(int64_t delta) {
- reservation_ += delta;
- //LOG(INFO) << "chenhao tracker:" << tracker_name_ << " reservation:" <<
reservation_
- // << " delta:" << delta << " limit:" << reservation_limit_;
- COUNTER_SET(counters_.peak_reservation, reservation_);
- counters_.peak_reservation->set(reservation_);
- CheckConsistency();
-}
-
-std::string ReservationTracker::DebugString() {
- //std::lock_guard<SpinLock> l(lock_);
- if (!initialized_) return "<ReservationTracker>: uninitialized";
-
- std::string parent_debug_string = parent_ == nullptr ? "NULL" :
parent_->DebugString();
- std::stringstream ss;
- ss << "<ReservationTracker>: reservation_limit " << reservation_limit_ <<
" reservation "
- << reservation_ << " used_reservation " << used_reservation_ << "
child_reservations "
- << child_reservations_ << " parent:\n"
- << parent_debug_string;
- return ss.str();
-}
-} // namespace doris
diff --git a/be/src/runtime/bufferpool/reservation_tracker.h
b/be/src/runtime/bufferpool/reservation_tracker.h
deleted file mode 100644
index 80408aa6eb..0000000000
--- a/be/src/runtime/bufferpool/reservation_tracker.h
+++ /dev/null
@@ -1,290 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdint.h>
-
-#include <string>
-
-#include "common/status.h"
-#include "runtime/bufferpool/reservation_tracker_counters.h"
-#include "util/spinlock.h"
-
-namespace doris {
-
-class DummyProfile;
-class MemTracker;
-class RuntimeProfile;
-
-/// A tracker for a hierarchy of buffer pool memory reservations, denominated
in bytes.
-/// A hierarchy of ReservationTrackers provides a mechanism for subdividing
buffer pool
-/// memory and enforcing upper and lower bounds on memory usage.
-///
-/// The root of the tracker tree enforces a global maximum, which is
distributed among its
-/// children. Each tracker in the tree has a 'reservation': the total bytes of
buffer pool
-/// memory it is entitled to use. The reservation is inclusive of any memory
that is
-/// already allocated from the reservation, i.e. using a reservation to
allocate memory
-/// does not subtract from the reservation.
-///
-/// A reservation can be used directly at the tracker by calling
AllocateFrom(), or
-/// distributed to children of the tracker for the childrens' reservations.
Each tracker
-/// in the tree can use up to its reservation without checking parent
trackers. To
-/// increase its reservation, a tracker must use some of its parent's
reservation (and
-/// perhaps increase reservations all the way to the root of the tree).
-///
-/// Each tracker also has a maximum reservation that is enforced. E.g. if the
root of the
-/// tracker hierarchy is the global tracker for the Impala daemon and the next
level of
-/// the hierarchy is made up of per-query trackers, then the maximum
reservation
-/// mechanism can enforce both process-level and query-level limits on
reservations.
-///
-/// Invariants:
-/// * A tracker's reservation is at most its reservation limit: reservation <=
limit
-/// * A tracker's reservation is at least the sum of its childrens'
reservations plus
-/// the amount of the reservation used directly at this tracker. The
difference is
-/// the unused reservation:
-/// child_reservations + used_reservation + unused_reservation =
reservation.
-///
-/// Thread-safety:
-/// All public ReservationTracker methods are thread-safe. If multiple threads
-/// concurrently invoke methods on a ReservationTracker, each operation is
applied
-/// atomically to leave the ReservationTracker in a consistent state. Calling
threads
-/// are responsible for coordinating to avoid violating any method
preconditions,
-/// e.g. ensuring that there is sufficient unused reservation before calling
AllocateTo().
-///
-/// Integration with MemTracker hierarchy:
-/// TODO: we will remove MemTracker and this integration once all memory is
accounted via
-/// reservations.
-///
-/// Each ReservationTracker can optionally have a linked MemTracker. E.g. an
exec
-/// node's ReservationTracker can be linked with the exec node's MemTracker,
so that
-/// reservations are included in query memory consumption for the purposes of
enforcing
-/// memory limits, reporting and logging. The reservation is accounted as
consumption
-/// against the linked MemTracker and its ancestors because reserved memory is
committed.
-/// Allocating from a reservation therefore does not change the consumption
reflected in
-/// the MemTracker hierarchy.
-///
-/// MemTracker limits are only checked via the topmost link (i.e. the
query-level
-/// trackers): we require that no MemTrackers below this level have limits.
-///
-/// We require that the MemTracker hierarchy is consistent with the
ReservationTracker
-/// hierarchy. I.e. if a ReservationTracker is linked to a MemTracker "A", and
its parent
-/// is linked to a MemTracker "B", then "B" must be the parent of "A"'.
-class ReservationTracker {
-public:
- ReservationTracker();
- virtual ~ReservationTracker();
-
- /// Initializes the root tracker with the given reservation limit in
bytes. The initial
- /// reservation is 0.
- /// if 'profile' is not nullptr, the counters defined in
ReservationTrackerCounters are
- /// added to 'profile'.
- void InitRootTracker(RuntimeProfile* profile, int64_t reservation_limit);
-
- /// Initializes a new ReservationTracker with a parent.
- /// If 'mem_tracker' is not nullptr, reservations for this
ReservationTracker and its
- /// children will be counted as consumption against 'mem_tracker'.
- /// 'reservation_limit' is the maximum reservation for this tracker in
bytes.
- /// if 'profile' is not nullptr, the counters in 'counters_' are added to
'profile'.
- void InitChildTracker(RuntimeProfile* profile, ReservationTracker* parent,
- int64_t reservation_limit);
-
- /// If the tracker is initialized, deregister the ReservationTracker from
its parent,
- /// relinquishing all this tracker's reservation. All of the reservation
must be unused
- /// and all the tracker's children must be closed before calling this
method.
- /// TODO: decide on and implement policy for how far to release the
reservation up
- /// the tree. Currently the reservation is released all the way to the
root.
- void Close();
-
- /// Request to increase reservation by 'bytes'. The request is either
granted in
- /// full or not at all. Uses any unused reservation on ancestors and
increase
- /// ancestors' reservations if needed to fit the increased reservation.
- /// Returns true if the reservation increase is granted, or false if not
granted.
- /// If the reservation is not granted, no modifications are made to the
state of
- /// any ReservationTrackers.
- bool IncreaseReservation(int64_t bytes) WARN_UNUSED_RESULT;
-
- /// Tries to ensure that 'bytes' of unused reservation is available. If
not already
- /// available, tries to increase the reservation such that the unused
reservation is
- /// exactly equal to 'bytes'. Uses any unused reservation on ancestors and
increase
- /// ancestors' reservations if needed to fit the increased reservation.
- /// Returns true if the reservation increase was successful or not
necessary.
- bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT;
-
- /// Decrease reservation by 'bytes' on this tracker and all ancestors.
This tracker's
- /// reservation must be at least 'bytes' before calling this method.
- void DecreaseReservation(int64_t bytes) { DecreaseReservation(bytes,
false); }
-
- /// Transfer reservation from this tracker to 'other'. Both trackers must
be in the
- /// same query subtree of the hierarchy. One tracker can be the ancestor
of the other,
- /// or they can share a common ancestor. The subtree root must be at the
query level
- /// or below so that the transfer cannot cause a MemTracker limit to be
exceeded
- /// (because linked MemTrackers with limits below the query level are not
supported).
- /// Returns true on success or false if the transfer would have caused a
reservation
- /// limit to be exceeded.
- bool TransferReservationTo(ReservationTracker* other, int64_t bytes)
WARN_UNUSED_RESULT;
-
- /// Allocate 'bytes' from the reservation. The tracker must have at least
'bytes'
- /// unused reservation before calling this method.
- void AllocateFrom(int64_t bytes);
-
- /// Release 'bytes' of previously allocated memory. The used reservation is
- /// decreased by 'bytes'. Before the call, the used reservation must be at
least
- /// 'bytes' before calling this method.
- void ReleaseTo(int64_t bytes);
-
- /// Returns the amount of the reservation in bytes.
- int64_t GetReservation();
-
- /// Returns the current amount of the reservation used at this tracker,
not including
- /// reservations of children in bytes.
- int64_t GetUsedReservation();
-
- /// Returns the amount of the reservation neither used nor given to
childrens'
- /// reservations at this tracker in bytes.
- int64_t GetUnusedReservation();
-
- /// Returns the total reservations of children in bytes.
- int64_t GetChildReservations();
-
- /// Support for debug actions: deny reservation increase with probability
'probability'.
- void SetDebugDenyIncreaseReservation(double probability) {
- increase_deny_probability_ = probability;
- }
-
- ReservationTracker* parent() const { return parent_; }
-
- std::string DebugString();
-
-private:
- /// Returns the amount of 'reservation_' that is unused.
- int64_t unused_reservation() const {
- return reservation_ - used_reservation_ - child_reservations_;
- }
-
- /// Returns the parent's memtracker if 'parent_' is non-nullptr, or
nullptr otherwise.
- MemTracker* GetParentMemTracker() const {
- return parent_ == nullptr ? nullptr : parent_->mem_tracker_;
- }
-
- /// Initializes 'counters_', storing the counters in 'profile'.
- /// If 'profile' is nullptr, creates a dummy profile to store the counters.
- void InitCounters(RuntimeProfile* profile, int64_t max_reservation);
-
- /// Internal helper for IncreaseReservation(). If
'use_existing_reservation' is true,
- /// increase by the minimum amount so that 'bytes' fits in the
reservation, otherwise
- /// just increase by 'bytes'. If 'is_child_reservation' is true, also
increase
- /// 'child_reservations_' by 'bytes'.
- /// 'lock_' must be held by caller.
- bool IncreaseReservationInternalLocked(int64_t bytes, bool
use_existing_reservation,
- bool is_child_reservation);
-
- /// Increase consumption on linked MemTracker to reflect an increase in
reservation
- /// of 'reservation_increase'. For the topmost link, return false if this
failed
- /// because it would exceed a memory limit. If there is no linked
MemTracker, just
- /// returns true.
- /// TODO: remove once we account all memory via ReservationTrackers.
- bool TryConsumeFromMemTracker(int64_t reservation_increase);
-
- /// Decrease consumption on linked MemTracker to reflect a decrease in
reservation of
- /// 'reservation_decrease'. If there is no linked MemTracker, does nothing.
- /// TODO: remove once we account all memory via ReservationTrackers.
- void ReleaseToMemTracker(int64_t reservation_decrease);
-
- /// Decrease reservation by 'bytes' on this tracker and all ancestors.
This tracker's
- /// reservation must be at least 'bytes' before calling this method. If
- /// 'is_child_reservation' is true it decreases 'child_reservations_' by
'bytes'
- void DecreaseReservation(int64_t bytes, bool is_child_reservation);
-
- /// Same as DecreaseReservation(), but 'lock_' must be held by caller.
- void DecreaseReservationLocked(int64_t bytes, bool is_child_reservation);
-
- /// Return a vector containing the trackers on the path to the root
tracker. Includes
- /// the current tracker and the root tracker.
- std::vector<ReservationTracker*> FindPathToRoot();
-
- /// Return true if trackers in the subtree rooted at 'subtree1' precede
trackers in
- /// the subtree rooted at 'subtree2' in the lock order. 'subtree1' and
'subtree2'
- /// must share the same parent.
- static bool lock_sibling_subtree_first(ReservationTracker* subtree1,
- ReservationTracker* subtree2) {
- DCHECK_EQ(subtree1->parent_, subtree2->parent_);
- return reinterpret_cast<uintptr_t>(subtree1) <
reinterpret_cast<uintptr_t>(subtree2);
- }
-
- /// Check the internal consistency of the ReservationTracker and DCHECKs
if in an
- /// inconsistent state.
- /// 'lock_' must be held by caller.
- void CheckConsistency() const;
-
- /// Increase or decrease 'used_reservation_' and update profile counters
accordingly.
- /// 'lock_' must be held by caller.
- void UpdateUsedReservation(int64_t delta);
-
- /// Increase or decrease 'reservation_' and update profile counters
accordingly.
- /// 'lock_' must be held by caller.
- void UpdateReservation(int64_t delta);
-
- /// Support for debug actions: see SetDebugDenyIncreaseReservation() for
behaviour.
- double increase_deny_probability_ = 0.0;
-
- /// lock_ protects all below members. The lock order in a tree of
ReservationTrackers is
- /// based on a post-order traversal of the tree, with children visited in
order of the
- /// memory address of the ReservationTracker object. The following rules
can be applied
- /// to determine the relative positions of two trackers t1 and t2 in the
lock order:
- /// * If t1 is a descendent of t2, t1's lock must be acquired before t2's
lock (i.e.
- /// locks are acquired bottom-up).
- /// * If neither t1 or t2 is a descendant of the other, they must be in
subtrees of
- /// under a common ancestor. If the memory address of t1's subtree's
root is less
- /// than the memory address of t2's subtree's root, t1's lock must be
acquired before
- /// t2's lock. This check is implemented in lock_sibling_subtree_first().
- SpinLock lock_;
-
- /// True if the tracker is initialized.
- bool initialized_ = false;
-
- /// A dummy profile to hold the counters in 'counters_' in the case that
no profile
- /// is provided.
- std::unique_ptr<DummyProfile> dummy_profile_;
-
- /// The RuntimeProfile counters for this tracker.
- /// All non-nullptr if 'initialized_' is true.
- ReservationTrackerCounters counters_;
-
- /// The parent of this tracker in the hierarchy. Does not change after
initialization.
- ReservationTracker* parent_ = nullptr;
-
- /// If non-nullptr, reservations are counted as memory consumption against
this tracker.
- /// Does not change after initialization. Not owned.
- /// TODO: remove once all memory is accounted via ReservationTrackers.
- MemTracker* mem_tracker_ = nullptr;
-
- /// The maximum reservation in bytes that this tracker can have.
- int64_t reservation_limit_;
-
- /// This tracker's current reservation in bytes. 'reservation_' <=
'reservation_limit_'.
- int64_t reservation_;
-
- /// Total reservation of children in bytes. This is included in
'reservation_'.
- /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
- int64_t child_reservations_;
-
- /// The amount of the reservation currently used by this tracker in bytes.
- /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
- int64_t used_reservation_;
-};
-} // namespace doris
diff --git a/be/src/runtime/bufferpool/reservation_tracker_counters.h
b/be/src/runtime/bufferpool/reservation_tracker_counters.h
deleted file mode 100644
index 4383c6819c..0000000000
--- a/be/src/runtime/bufferpool/reservation_tracker_counters.h
+++ /dev/null
@@ -1,38 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "util/runtime_profile.h"
-
-namespace doris {
-
-/// A set of counters for each ReservationTracker for reporting purposes.
-///
-/// If the ReservationTracker is linked to a profile these have the same
lifetime as that
-/// profile, otherwise they have the same lifetime as the ReservationTracker
itself.
-struct ReservationTrackerCounters {
- /// The tracker's peak reservation in bytes.
- RuntimeProfile::HighWaterMarkCounter* peak_reservation;
-
- /// The tracker's peak usage in bytes.
- RuntimeProfile::HighWaterMarkCounter* peak_used_reservation;
-
- /// The hard limit on the tracker's reservations
- RuntimeProfile::Counter* reservation_limit;
-};
-} // namespace doris
diff --git a/be/src/runtime/bufferpool/reservation_util.cc
b/be/src/runtime/bufferpool/reservation_util.cc
deleted file mode 100644
index 2f7350c3ea..0000000000
--- a/be/src/runtime/bufferpool/reservation_util.cc
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/bufferpool/reservation_util.h"
-
-#include <algorithm>
-
-namespace doris {
-
-// Most operators that accumulate memory use reservations, so the majority of
memory
-// should be allocated to buffer reservations, as a heuristic.
-const double ReservationUtil::RESERVATION_MEM_FRACTION = 0.8;
-const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 75 * 1024 *
1024;
-
-int64_t ReservationUtil::GetReservationLimitFromMemLimit(int64_t mem_limit) {
- int64_t max_reservation = std::min<int64_t>(RESERVATION_MEM_FRACTION *
mem_limit,
- mem_limit -
RESERVATION_MEM_MIN_REMAINING);
- return std::max<int64_t>(0, max_reservation);
-}
-
-int64_t ReservationUtil::GetMinMemLimitFromReservation(int64_t
buffer_reservation) {
- buffer_reservation = std::max<int64_t>(0, buffer_reservation);
- return std::max<int64_t>(buffer_reservation * (1.0 /
ReservationUtil::RESERVATION_MEM_FRACTION),
- buffer_reservation +
ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
-}
-} // namespace doris
diff --git a/be/src/runtime/bufferpool/reservation_util.h
b/be/src/runtime/bufferpool/reservation_util.h
deleted file mode 100644
index 606e2f6475..0000000000
--- a/be/src/runtime/bufferpool/reservation_util.h
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdint.h>
-
-namespace doris {
-
-/// Utility code related to buffer reservations.
-class ReservationUtil {
-public:
- /// There are currently two classes of memory: reserved memory (i.e.
memory that is
- /// reserved with reservation trackers/allocated by the buffer pool), and
unreserved
- /// memory (i.e. everything else; code that hasn't yet been updated to use
reserved
- /// memory). Eventually, all memory should be in the former category, but
each operator
- /// must be converted to use reserved memory and that work is ongoing. See
IMPALA-4834.
- /// In the meantime, the system memory must be shared between these two
classes of
- /// memory. RESERVATION_MEM_FRACTION and RESERVATION_MEM_MIN_REMAINING are
used to
- /// determine an upper bound on reserved memory for a query. Operators
operate reliably
- /// when they are using bounded reserved memory (e.g. staying under a
limit by
- /// spilling), but will generally fail if they hit a limit when trying to
allocate
- /// unreserved memory. Thus we need to ensure there is always space left
in the query
- /// memory limit for unreserved memory.
-
- /// The fraction of the query mem limit that is used as the maximum buffer
reservation
- /// limit, i.e. the bound on reserved memory. It is expected that
unreserved memory
- /// (i.e. not accounted by buffer reservation trackers) stays within
- /// (1 - RESERVATION_MEM_FRACTION).
- /// TODO: remove once all operators use buffer reservations.
- static const double RESERVATION_MEM_FRACTION;
-
- /// The minimum amount of memory that should be left after buffer
reservations, i.e.
- /// this is the minimum amount of memory that should be left for
unreserved memory.
- /// TODO: remove once all operators use buffer reservations.
- static const int64_t RESERVATION_MEM_MIN_REMAINING;
-
- /// Helper function to get the query buffer reservation limit (in bytes)
given a query
- /// mem_limit. In other words, this determines the maximum portion of the
mem_limit
- /// that should go to reserved memory. The limit on reservations is
computed as:
- /// min(query_limit * RESERVATION_MEM_FRACTION,
- /// query_limit - RESERVATION_MEM_MIN_REMAINING)
- /// TODO: remove once all operators use buffer reservations.
- static int64_t GetReservationLimitFromMemLimit(int64_t mem_limit);
-
- /// Helper function to get the minimum query mem_limit (in bytes) that
will be large
- /// enough for a buffer reservation of size 'buffer_reservation' bytes. In
other words,
- /// this determines the minimum mem_limit that will be large enough to
accomidate
- /// 'buffer_reservation' reserved memory, as well as some amount of
unreserved memory
- /// (determined by a heuristic).
- /// The returned mem_limit X satisfies:
- /// buffer_reservation <= GetReservationLimitFromMemLimit(X)
- /// TODO: remove once all operators use buffer reservations.
- static int64_t GetMinMemLimitFromReservation(int64_t buffer_reservation);
-};
-
-} // namespace doris
diff --git a/be/src/runtime/bufferpool/suballocator.cc
b/be/src/runtime/bufferpool/suballocator.cc
index 021a4922df..f26aee6205 100644
--- a/be/src/runtime/bufferpool/suballocator.cc
+++ b/be/src/runtime/bufferpool/suballocator.cc
@@ -20,7 +20,6 @@
#include <new>
#include "gutil/strings/substitute.h"
-#include "runtime/bufferpool/reservation_tracker.h"
#include "util/bit_util.h"
namespace doris {
@@ -97,10 +96,6 @@ uint64_t Suballocator::ComputeAllocateBufferSize(int64_t
bytes) const {
Status Suballocator::AllocateBuffer(int64_t bytes,
std::unique_ptr<Suballocation>* result) {
DCHECK_LE(bytes, MAX_ALLOCATION_BYTES);
const int64_t buffer_len = std::max(min_buffer_len_,
BitUtil::RoundUpToPowerOfTwo(bytes));
- if (!client_->IncreaseReservationToFit(buffer_len)) {
- *result = nullptr;
- return Status::OK();
- }
std::unique_ptr<Suballocation> free_node;
RETURN_IF_ERROR(Suballocation::Create(&free_node));
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index bb10b75aaa..0cd9878046 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -48,7 +48,6 @@ class StorageEngine;
class MemTrackerTaskPool;
class PriorityThreadPool;
class PriorityWorkStealingThreadPool;
-class ReservationTracker;
class ResultBufferMgr;
class ResultQueueMgr;
class TMasterInfo;
@@ -141,7 +140,6 @@ public:
BrpcClientCache<PFunctionService_Stub>* brpc_function_client_cache() const
{
return _function_client_cache;
}
- ReservationTracker* buffer_reservation() { return _buffer_reservation; }
BufferPool* buffer_pool() { return _buffer_pool; }
LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; }
LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr; }
@@ -163,7 +161,7 @@ private:
void _destroy();
Status _init_mem_tracker();
- /// Initialise 'buffer_pool_' and 'buffer_reservation_' with given
capacity.
+ /// Initialise 'buffer_pool_' with given capacity.
void _init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t
clean_pages_limit);
void _register_metrics();
@@ -224,7 +222,6 @@ private:
BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr;
- ReservationTracker* _buffer_reservation = nullptr;
BufferPool* _buffer_pool = nullptr;
StorageEngine* _storage_engine = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 3057681596..d628ec7c84 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -27,7 +27,6 @@
#include "olap/storage_policy_mgr.h"
#include "runtime/broker_mgr.h"
#include "runtime/bufferpool/buffer_pool.h"
-#include "runtime/bufferpool/reservation_tracker.h"
#include "runtime/cache/result_cache.h"
#include "runtime/client_cache.h"
#include "runtime/data_stream_mgr.h"
@@ -311,8 +310,6 @@ void ExecEnv::_init_buffer_pool(int64_t min_page_size,
int64_t capacity,
int64_t clean_pages_limit) {
DCHECK(_buffer_pool == nullptr);
_buffer_pool = new BufferPool(min_page_size, capacity, clean_pages_limit);
- _buffer_reservation = new ReservationTracker();
- _buffer_reservation->InitRootTracker(nullptr, capacity);
}
void ExecEnv::_register_metrics() {
@@ -364,7 +361,6 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_external_scan_context_mgr);
SAFE_DELETE(_heartbeat_flags);
SAFE_DELETE(_task_pool_mem_tracker_registry);
- SAFE_DELETE(_buffer_reservation);
SAFE_DELETE(_scanner_scheduler);
DEREGISTER_HOOK_METRIC(query_mem_consumption);
diff --git a/be/src/runtime/initial_reservations.cc
b/be/src/runtime/initial_reservations.cc
deleted file mode 100644
index 73757eaac4..0000000000
--- a/be/src/runtime/initial_reservations.cc
+++ /dev/null
@@ -1,83 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-//
https://github.com/apache/impala/blob/branch-2.10.0/be/src/runtime/initial-reservations.cc
-// and modified by Doris
-
-#include "runtime/initial_reservations.h"
-
-#include <limits>
-#include <mutex>
-
-#include "common/logging.h"
-#include "common/object_pool.h"
-#include "runtime/exec_env.h"
-#include "util/pretty_printer.h"
-#include "util/uid_util.h"
-
-using std::numeric_limits;
-
-namespace doris {
-
-InitialReservations::InitialReservations(ObjectPool* obj_pool,
- ReservationTracker* query_reservation,
- int64_t
initial_reservation_total_claims)
- :
remaining_initial_reservation_claims_(initial_reservation_total_claims) {
- initial_reservations_.InitChildTracker(nullptr, query_reservation,
- numeric_limits<int64_t>::max());
-}
-
-Status InitialReservations::Init(const TUniqueId& query_id, int64_t
query_min_reservation) {
- DCHECK_EQ(0, initial_reservations_.GetReservation()) << "Already inited";
- if (!initial_reservations_.IncreaseReservation(query_min_reservation)) {
- std::stringstream ss;
- ss << "Minimum reservation unavailable: " << query_min_reservation
- << " query id:" << query_id;
- return Status::MinimumReservationUnavailable(ss.str());
- }
- VLOG_QUERY << "Successfully claimed initial reservations ("
- << PrettyPrinter::print(query_min_reservation, TUnit::BYTES) <<
") for"
- << " query " << print_id(query_id);
- return Status::OK();
-}
-
-void InitialReservations::Claim(BufferPool::ClientHandle* dst, int64_t bytes) {
- DCHECK_GE(bytes, 0);
- std::lock_guard<SpinLock> l(lock_);
- DCHECK_LE(bytes, remaining_initial_reservation_claims_);
- bool success = dst->TransferReservationFrom(&initial_reservations_, bytes);
- DCHECK(success) << "Planner computation should ensure enough initial
reservations";
- remaining_initial_reservation_claims_ -= bytes;
-}
-
-void InitialReservations::Return(BufferPool::ClientHandle* src, int64_t bytes)
{
- std::lock_guard<SpinLock> l(lock_);
- bool success = src->TransferReservationTo(&initial_reservations_, bytes);
- // No limits on our tracker - no way this should fail.
- DCHECK(success);
- // Check to see if we can release any reservation.
- int64_t excess_reservation =
- initial_reservations_.GetReservation() -
remaining_initial_reservation_claims_;
- if (excess_reservation > 0) {
- initial_reservations_.DecreaseReservation(excess_reservation);
- }
-}
-
-void InitialReservations::ReleaseResources() {
- initial_reservations_.Close();
-}
-} // namespace doris
diff --git a/be/src/runtime/initial_reservations.h
b/be/src/runtime/initial_reservations.h
deleted file mode 100644
index 9ffb3ab367..0000000000
--- a/be/src/runtime/initial_reservations.h
+++ /dev/null
@@ -1,78 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-//
https://github.com/apache/impala/blob/branch-2.10.0/be/src/runtime/initial-reservations.h
-// and modified by Doris
-
-#pragma once
-
-#include "common/status.h"
-#include "gen_cpp/Types_types.h" // for TUniqueId
-#include "runtime/bufferpool/buffer_pool.h"
-#include "runtime/bufferpool/reservation_tracker.h"
-#include "util/spinlock.h"
-
-namespace doris {
-
-class ObjectPool;
-
-/**
- * Manages the pool of initial reservations for different nodes in the plan
tree.
- * Each plan node and sink claims its initial reservation from here, then
returns it when
- * it is done executing. The frontend is responsible for making sure that
enough initial
- * reservation is in this pool for all of the concurrent claims.
- */
-class InitialReservations {
-public:
- /// 'query_reservation' and 'query_mem_tracker' are the top-level trackers
for the
- /// query. This creates trackers for initial reservations under those.
- /// 'initial_reservation_total_claims' is the total of initial
reservations that will be
- /// claimed over the lifetime of the query. The total bytes claimed via
Claim()
- /// cannot exceed this. Allocated objects are stored in 'obj_pool'.
- InitialReservations(ObjectPool* obj_pool, ReservationTracker*
query_reservation,
- int64_t initial_reservation_total_claims);
-
- /// Initialize the query's pool of initial reservations by acquiring the
minimum
- /// reservation required for the query on this host. Fails if the
reservation could
- /// not be acquired, e.g. because it would exceed a pool or process limit.
- Status Init(const TUniqueId& query_id, int64_t query_min_reservation)
WARN_UNUSED_RESULT;
-
- /// Claim the initial reservation of 'bytes' for 'dst'. Assumes that the
transfer will
- /// not violate any reservation limits on 'dst'.
- void Claim(BufferPool::ClientHandle* dst, int64_t bytes);
-
- /// Return the initial reservation of 'bytes' from 'src'. The reservation
is returned
- /// to the pool of reservations if it may be needed to satisfy a
subsequent claim or
- /// otherwise is released.
- void Return(BufferPool::ClientHandle* src, int64_t bytes);
-
- /// Release any reservations held onto by this object.
- void ReleaseResources();
-
-private:
- // Protects all below members to ensure that the internal state is
consistent.
- SpinLock lock_;
-
- // The pool of initial reservations that Claim() returns reservations from
and
- // Return() returns reservations to.
- ReservationTracker initial_reservations_;
-
- /// The total bytes of additional reservations that we expect to be
claimed.
- /// initial_reservations_->GetReservation() <=
remaining_initial_reservation_claims_.
- int64_t remaining_initial_reservation_claims_;
-};
-} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 84209888ef..df7becf04d 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -252,29 +252,30 @@ Status MemTrackerLimiter::mem_limit_exceeded_log(const
std::string& msg) {
Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t
failed_consume_size) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
DCHECK(!_limited_ancestors.empty());
+ std::string detail = fmt::format("memory limit
exceeded:<consumed_tracker={}, ", _label);
+ if (failed_consume_size != 0)
+ detail += fmt::format("need_size={}, ",
+ PrettyPrinter::print(failed_consume_size,
TUnit::BYTES));
+ MemTrackerLimiter* exceeded_tracker = this;
+ int64_t free_size = INT_MAX;
for (const auto& tracker : _limited_ancestors) {
- if (tracker->has_limit() &&
- tracker->limit() < tracker->peak_consumption() +
failed_consume_size) {
- std::string detail;
- if (failed_consume_size != 0) {
- detail = fmt::format(
- "memory limit exceeded:<consumed_tracker={},
need_size={}, "
- "exceeded_tracker={}, limit={}, peak_used={},
current_used={}>, "
- "executing:<{}>",
- _label, PrettyPrinter::print(failed_consume_size,
TUnit::BYTES),
- tracker->label(), tracker->limit(),
tracker->peak_consumption(),
- tracker->consumption(), msg);
- } else {
- detail = fmt::format(
- "memory limit exceeded:<exceeded_tracker={}, limit={},
peak_used={}, "
- "current_used={}>, executing:<{}>",
- tracker->label(), tracker->limit(),
tracker->peak_consumption(),
- tracker->consumption(), msg);
- }
- return tracker->mem_limit_exceeded_log(detail);
+ int64_t max_consumption = tracker->peak_consumption() >
tracker->consumption()
+ ? tracker->peak_consumption()
+ : tracker->consumption();
+ if (tracker->has_limit() && tracker->limit() < max_consumption +
failed_consume_size) {
+ exceeded_tracker = tracker;
+ break;
+ }
+ if (tracker->has_limit() && tracker->limit() - max_consumption <
free_size) {
+ free_size = tracker->limit() - max_consumption;
+ exceeded_tracker = tracker;
}
}
- return Status::MemoryLimitExceeded("no mem tracker exceed limit");
+ detail += fmt::format(
+ "exceeded_tracker={}, limit={}, peak_used={}, current_used={}>,
executing_msg:<{}>",
+ exceeded_tracker->label(), exceeded_tracker->limit(),
+ exceeded_tracker->peak_consumption(),
exceeded_tracker->consumption(), msg);
+ return exceeded_tracker->mem_limit_exceeded_log(detail);
}
Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
@@ -282,8 +283,8 @@ Status MemTrackerLimiter::mem_limit_exceeded(const
std::string& msg,
Status failed_try_consume_st) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
std::string detail =
- fmt::format("memory limit exceeded:<consumed_tracker={}, {}>,
executing:<{}>", _label,
- failed_try_consume_st.get_error_msg(), msg);
+ fmt::format("memory limit exceeded:<consumed_tracker={}, {}>,
executing_msg:<{}>",
+ _label, failed_try_consume_st.get_error_msg(), msg);
return failed_tracker->mem_limit_exceeded_log(detail);
}
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 208130f928..e8a5e2846c 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -31,10 +31,7 @@
#include "common/status.h"
#include "exec/exec_node.h"
#include "runtime/buffered_block_mgr2.h"
-#include "runtime/bufferpool/reservation_tracker.h"
-#include "runtime/bufferpool/reservation_util.h"
#include "runtime/exec_env.h"
-#include "runtime/initial_reservations.h"
#include "runtime/load_path_mgr.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_task_pool.h"
@@ -68,8 +65,7 @@ RuntimeState::RuntimeState(const TUniqueId&
fragment_instance_id,
_normal_row_number(0),
_error_row_number(0),
_error_log_file_path(""),
- _error_log_file(nullptr),
- _instance_buffer_reservation(new ReservationTracker) {
+ _error_log_file(nullptr) {
Status status = init(fragment_instance_id, query_options, query_globals,
exec_env);
DCHECK(status.ok());
}
@@ -94,8 +90,7 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams&
fragment_exec_params,
_normal_row_number(0),
_error_row_number(0),
_error_log_file_path(""),
- _error_log_file(nullptr),
- _instance_buffer_reservation(new ReservationTracker) {
+ _error_log_file(nullptr) {
if (fragment_exec_params.__isset.runtime_filter_params) {
_runtime_filter_mgr->set_runtime_filter_params(fragment_exec_params.runtime_filter_params);
}
@@ -157,19 +152,6 @@ RuntimeState::~RuntimeState() {
_error_hub->close();
}
- // Release the reservation, which should be unused at the point.
- if (_instance_buffer_reservation != nullptr) {
- _instance_buffer_reservation->Close();
- }
-
- if (_initial_reservations != nullptr) {
- _initial_reservations->ReleaseResources();
- }
-
- if (_buffer_reservation != nullptr) {
- _buffer_reservation->Close();
- }
-
// Manually release the child mem tracker before _instance_mem_tracker is
destructed.
_obj_pool->clear();
_runtime_filter_mgr.reset();
@@ -248,17 +230,6 @@ Status RuntimeState::init_mem_trackers(const TUniqueId&
query_id) {
-1, "RuntimeState:instance:" + print_id(_fragment_instance_id),
_query_mem_tracker,
&_profile);
- RETURN_IF_ERROR(init_buffer_poolstate());
-
- _initial_reservations = _obj_pool->add(new InitialReservations(
- _obj_pool.get(), _buffer_reservation,
_query_options.initial_reservation_total_claims));
- RETURN_IF_ERROR(_initial_reservations->Init(_query_id, min_reservation()));
- DCHECK_EQ(0, _initial_reservation_refcnt.load());
-
- if (_instance_buffer_reservation != nullptr) {
- _instance_buffer_reservation->InitChildTracker(&_profile,
_buffer_reservation,
-
std::numeric_limits<int64_t>::max());
- }
return Status::OK();
}
@@ -268,29 +239,6 @@ Status RuntimeState::init_instance_mem_tracker() {
return Status::OK();
}
-Status RuntimeState::init_buffer_poolstate() {
- ExecEnv* exec_env = ExecEnv::GetInstance();
- int64_t mem_limit = _query_mem_tracker->get_lowest_limit();
- int64_t max_reservation;
- if (query_options().__isset.buffer_pool_limit &&
query_options().buffer_pool_limit > 0) {
- max_reservation = query_options().buffer_pool_limit;
- } else if (mem_limit == -1) {
- // No query mem limit. The process-wide reservation limit is the only
limit on
- // reservations.
- max_reservation = std::numeric_limits<int64_t>::max();
- } else {
- DCHECK_GE(mem_limit, 0);
- max_reservation =
ReservationUtil::GetReservationLimitFromMemLimit(mem_limit);
- }
-
- VLOG_QUERY << "Buffer pool limit for " << print_id(_query_id) << ": " <<
max_reservation;
-
- _buffer_reservation = _obj_pool->add(new ReservationTracker);
- _buffer_reservation->InitChildTracker(nullptr,
exec_env->buffer_reservation(), max_reservation);
-
- return Status::OK();
-}
-
Status RuntimeState::create_block_mgr() {
DCHECK(_block_mgr2.get() == nullptr);
RETURN_IF_ERROR(BufferedBlockMgr2::create(this, runtime_profile(),
_exec_env->tmp_file_mgr(),
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 033f19ae3f..46838af5cb 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -56,8 +56,6 @@ class TmpFileMgr;
class BufferedBlockMgr;
class BufferedBlockMgr2;
class LoadErrorHub;
-class ReservationTracker;
-class InitialReservations;
class RowDescriptor;
class RuntimeFilterMgr;
@@ -95,9 +93,6 @@ public:
// for ut only
Status init_instance_mem_tracker();
- /// Called from Init() to set up buffer reservations and the file group.
- Status init_buffer_poolstate();
-
// Gets/Creates the query wide block mgr.
Status create_block_mgr();
@@ -319,12 +314,6 @@ public:
int num_per_fragment_instances() const { return
_num_per_fragment_instances; }
- ReservationTracker* instance_buffer_reservation() { return
_instance_buffer_reservation.get(); }
-
- int64_t min_reservation() const { return _query_options.min_reservation; }
-
- int64_t max_reservation() const { return _query_options.max_reservation; }
-
bool disable_stream_preaggregations() const {
return _query_options.disable_stream_preaggregations;
}
@@ -360,11 +349,6 @@ public:
return segment_v2::CompressionTypePB::SNAPPY;
}
- // the following getters are only valid after Prepare()
- InitialReservations* initial_reservations() const { return
_initial_reservations; }
-
- ReservationTracker* buffer_reservation() const { return
_buffer_reservation; }
-
const std::vector<TTabletCommitInfo>& tablet_commit_infos() const {
return _tablet_commit_infos;
}
@@ -515,26 +499,6 @@ private:
std::vector<TTabletCommitInfo> _tablet_commit_infos;
std::vector<TErrorTabletInfo> _error_tablet_infos;
- //TODO chenhao , remove this to QueryState
- /// Pool of buffer reservations used to distribute initial reservations to
operators
- /// in the query. Contains a ReservationTracker that is a child of
- /// 'buffer_reservation_'. Owned by 'obj_pool_'. Set in Prepare().
- ReservationTracker* _buffer_reservation = nullptr;
-
- /// Buffer reservation for this fragment instance - a child of the query
buffer
- /// reservation. Non-nullptr if 'query_state_' is not nullptr.
- std::unique_ptr<ReservationTracker> _instance_buffer_reservation;
-
- /// Pool of buffer reservations used to distribute initial reservations to
operators
- /// in the query. Contains a ReservationTracker that is a child of
- /// 'buffer_reservation_'. Owned by 'obj_pool_'. Set in Prepare().
- InitialReservations* _initial_reservations = nullptr;
-
- /// Number of fragment instances executing, which may need to claim
- /// from 'initial_reservations_'.
- /// TODO: not needed if we call ReleaseResources() in a timely manner
(IMPALA-1575).
- std::atomic<int32_t> _initial_reservation_refcnt {0};
-
QueryFragmentsCtx* _query_ctx;
// true if max_filter_ratio is 0
diff --git a/be/test/exec/tablet_sink_test.cpp
b/be/test/exec/tablet_sink_test.cpp
index 812313750f..4e2d36bc88 100644
--- a/be/test/exec/tablet_sink_test.cpp
+++ b/be/test/exec/tablet_sink_test.cpp
@@ -22,7 +22,6 @@
#include "common/config.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "gen_cpp/internal_service.pb.h"
-#include "runtime/bufferpool/reservation_tracker.h"
#include "runtime/decimalv2_value.h"
#include "runtime/descriptor_helper.h"
#include "runtime/exec_env.h"
@@ -57,7 +56,6 @@ public:
_env->_load_stream_mgr = new LoadStreamMgr();
_env->_internal_client_cache = new
BrpcClientCache<PBackendService_Stub>();
_env->_function_client_cache = new
BrpcClientCache<PFunctionService_Stub>();
- _env->_buffer_reservation = new ReservationTracker();
_env->_task_pool_mem_tracker_registry = new MemTrackerTaskPool();
ThreadPoolBuilder("SendBatchThreadPool")
.set_min_threads(1)
@@ -74,7 +72,6 @@ public:
SAFE_DELETE(_env->_load_stream_mgr);
SAFE_DELETE(_env->_master_info);
SAFE_DELETE(_env->_thread_mgr);
- SAFE_DELETE(_env->_buffer_reservation);
SAFE_DELETE(_env->_task_pool_mem_tracker_registry);
if (_server) {
_server->Stop(100);
diff --git a/be/test/runtime/test_env.cc b/be/test/runtime/test_env.cc
index 6892cafd31..e48dba9f42 100644
--- a/be/test/runtime/test_env.cc
+++ b/be/test/runtime/test_env.cc
@@ -22,8 +22,8 @@
#include <memory>
#include "olap/storage_engine.h"
+#include "runtime/bufferpool/buffer_pool.h"
#include "runtime/fragment_mgr.h"
-#include "runtime/initial_reservations.h"
#include "runtime/memory/mem_tracker_task_pool.h"
#include "runtime/result_queue_mgr.h"
#include "util/disk_info.h"
@@ -35,7 +35,6 @@ TestEnv::TestEnv() {
// Some code will use ExecEnv::GetInstance(), so init the global ExecEnv
singleton
_exec_env = ExecEnv::GetInstance();
_exec_env->_thread_mgr = new ThreadResourceMgr(2);
- _exec_env->_buffer_reservation = new ReservationTracker();
_exec_env->_task_pool_mem_tracker_registry = new MemTrackerTaskPool();
_exec_env->_disk_io_mgr = new DiskIoMgr(1, 1, 1, 10);
_exec_env->disk_io_mgr()->init(-1);
@@ -64,7 +63,6 @@ TestEnv::~TestEnv() {
SAFE_DELETE(_exec_env->_scan_thread_pool);
SAFE_DELETE(_exec_env->_disk_io_mgr);
SAFE_DELETE(_exec_env->_task_pool_mem_tracker_registry);
- SAFE_DELETE(_exec_env->_buffer_reservation);
SAFE_DELETE(_exec_env->_thread_mgr);
if (_engine == StorageEngine::_s_instance) {
diff --git a/be/test/util/arrow/arrow_work_flow_test.cpp
b/be/test/util/arrow/arrow_work_flow_test.cpp
index c160047de4..240a9e7ea2 100644
--- a/be/test/util/arrow/arrow_work_flow_test.cpp
+++ b/be/test/util/arrow/arrow_work_flow_test.cpp
@@ -28,7 +28,6 @@
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
#include "olap/row.h"
-#include "runtime/bufferpool/reservation_tracker.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_task_pool.h"
#include "runtime/result_queue_mgr.h"
@@ -68,7 +67,6 @@ protected:
if (_exec_env) {
delete _exec_env->_result_queue_mgr;
delete _exec_env->_thread_mgr;
- delete _exec_env->_buffer_reservation;
delete _exec_env->_task_pool_mem_tracker_registry;
}
}
@@ -95,7 +93,6 @@ void ArrowWorkFlowTest::init() {
void ArrowWorkFlowTest::init_runtime_state() {
_exec_env->_result_queue_mgr = new ResultQueueMgr();
_exec_env->_thread_mgr = new ThreadResourceMgr();
- _exec_env->_buffer_reservation = new ReservationTracker();
_exec_env->_task_pool_mem_tracker_registry = new MemTrackerTaskPool();
_exec_env->_is_init = true;
TQueryOptions query_options;
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp
b/be/test/vec/exec/vtablet_sink_test.cpp
index 39d98e38ad..f2b7248fce 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -25,7 +25,6 @@
#include "common/config.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "gen_cpp/internal_service.pb.h"
-#include "runtime/bufferpool/reservation_tracker.h"
#include "runtime/decimalv2_value.h"
#include "runtime/descriptor_helper.h"
#include "runtime/exec_env.h"
@@ -59,7 +58,6 @@ public:
_env->_load_stream_mgr = new LoadStreamMgr();
_env->_internal_client_cache = new
BrpcClientCache<PBackendService_Stub>();
_env->_function_client_cache = new
BrpcClientCache<PFunctionService_Stub>();
- _env->_buffer_reservation = new ReservationTracker();
_env->_task_pool_mem_tracker_registry = new MemTrackerTaskPool();
ThreadPoolBuilder("SendBatchThreadPool")
.set_min_threads(1)
@@ -76,7 +74,6 @@ public:
SAFE_DELETE(_env->_load_stream_mgr);
SAFE_DELETE(_env->_master_info);
SAFE_DELETE(_env->_thread_mgr);
- SAFE_DELETE(_env->_buffer_reservation);
SAFE_DELETE(_env->_task_pool_mem_tracker_registry);
if (_server) {
_server->Stop(100);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]