http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-reader-context.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc b/be/src/runtime/disk-io-mgr-reader-context.cc index f1e71b2..c01f34b 100644 --- a/be/src/runtime/disk-io-mgr-reader-context.cc +++ b/be/src/runtime/disk-io-mgr-reader-context.cc @@ -18,7 +18,7 @@ using namespace impala; -void DiskIoMgr::RequestContext::Cancel(const Status& status) { +void DiskIoRequestContext::Cancel(const Status& status) { DCHECK(!status.ok()); // Callbacks are collected in this vector and invoked while no lock is held. @@ -28,18 +28,18 @@ void DiskIoMgr::RequestContext::Cancel(const Status& status) { DCHECK(Validate()) << endl << DebugString(); // Already being cancelled - if (state_ == RequestContext::Cancelled) return; + if (state_ == DiskIoRequestContext::Cancelled) return; DCHECK(status_.ok()); status_ = status; // The reader will be put into a cancelled state until call cleanup is complete. - state_ = RequestContext::Cancelled; + state_ = DiskIoRequestContext::Cancelled; // Cancel all scan ranges for this reader. Each range could be one one of // four queues. for (int i = 0; i < disk_states_.size(); ++i) { - RequestContext::PerDiskState& state = disk_states_[i]; + DiskIoRequestContext::PerDiskState& state = disk_states_[i]; RequestRange* range = NULL; while ((range = state.in_flight_ranges()->Dequeue()) != NULL) { if (range->request_type() == RequestType::READ) { @@ -74,7 +74,7 @@ void DiskIoMgr::RequestContext::Cancel(const Status& status) { // Schedule reader on all disks. The disks will notice it is cancelled and do any // required cleanup for (int i = 0; i < disk_states_.size(); ++i) { - RequestContext::PerDiskState& state = disk_states_[i]; + DiskIoRequestContext::PerDiskState& state = disk_states_[i]; state.ScheduleContext(this, i); } } @@ -88,10 +88,10 @@ void DiskIoMgr::RequestContext::Cancel(const Status& status) { ready_to_start_ranges_cv_.notify_all(); } -void DiskIoMgr::RequestContext::AddRequestRange( +void DiskIoRequestContext::AddRequestRange( DiskIoMgr::RequestRange* range, bool schedule_immediately) { // DCHECK(lock_.is_locked()); // TODO: boost should have this API - RequestContext::PerDiskState& state = disk_states_[range->disk_id()]; + DiskIoRequestContext::PerDiskState& state = disk_states_[range->disk_id()]; if (state.done()) { DCHECK_EQ(state.num_remaining_ranges(), 0); state.set_done(false); @@ -107,7 +107,7 @@ void DiskIoMgr::RequestContext::AddRequestRange( state.unstarted_scan_ranges()->Enqueue(scan_range); num_unstarted_scan_ranges_.Add(1); } - // If next_scan_range_to_start is NULL, schedule this RequestContext so that it will + // If next_scan_range_to_start is NULL, schedule this DiskIoRequestContext so that it will // be set. If it's not NULL, this context will be scheduled when GetNextRange() is // invoked. schedule_context = state.next_scan_range_to_start() == NULL; @@ -126,7 +126,7 @@ void DiskIoMgr::RequestContext::AddRequestRange( ++state.num_remaining_ranges(); } -DiskIoMgr::RequestContext::RequestContext(DiskIoMgr* parent, int num_disks) +DiskIoRequestContext::DiskIoRequestContext(DiskIoMgr* parent, int num_disks) : parent_(parent), bytes_read_counter_(NULL), read_timer_(NULL), @@ -137,7 +137,7 @@ DiskIoMgr::RequestContext::RequestContext(DiskIoMgr* parent, int num_disks) } // Resets this object. -void DiskIoMgr::RequestContext::Reset(MemTracker* tracker) { +void DiskIoRequestContext::Reset(MemTracker* tracker) { DCHECK_EQ(state_, Inactive); status_ = Status::OK(); @@ -173,13 +173,13 @@ void DiskIoMgr::RequestContext::Reset(MemTracker* tracker) { } // Dumps out request context information. Lock should be taken by caller -string DiskIoMgr::RequestContext::DebugString() const { +string DiskIoRequestContext::DebugString() const { stringstream ss; - ss << endl << " RequestContext: " << (void*)this << " (state="; - if (state_ == RequestContext::Inactive) ss << "Inactive"; - if (state_ == RequestContext::Cancelled) ss << "Cancelled"; - if (state_ == RequestContext::Active) ss << "Active"; - if (state_ != RequestContext::Inactive) { + ss << endl << " DiskIoRequestContext: " << (void*)this << " (state="; + if (state_ == DiskIoRequestContext::Inactive) ss << "Inactive"; + if (state_ == DiskIoRequestContext::Cancelled) ss << "Cancelled"; + if (state_ == DiskIoRequestContext::Active) ss << "Active"; + if (state_ != DiskIoRequestContext::Inactive) { ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail()) << " #ready_buffers=" << num_ready_buffers_.Load() << " #used_buffers=" << num_used_buffers_.Load() @@ -203,9 +203,9 @@ string DiskIoMgr::RequestContext::DebugString() const { return ss.str(); } -bool DiskIoMgr::RequestContext::Validate() const { - if (state_ == RequestContext::Inactive) { - LOG(WARNING) << "state_ == RequestContext::Inactive"; +bool DiskIoRequestContext::Validate() const { + if (state_ == DiskIoRequestContext::Inactive) { + LOG(WARNING) << "state_ == DiskIoRequestContext::Inactive"; return false; } @@ -234,7 +234,7 @@ bool DiskIoMgr::RequestContext::Validate() const { return false; } - if (state_ != RequestContext::Cancelled) { + if (state_ != DiskIoRequestContext::Cancelled) { if (state.unstarted_scan_ranges()->size() + state.in_flight_ranges()->size() > state.num_remaining_ranges()) { LOG(WARNING) << "disk_id=" << i @@ -285,7 +285,7 @@ bool DiskIoMgr::RequestContext::Validate() const { } } - if (state_ != RequestContext::Cancelled) { + if (state_ != DiskIoRequestContext::Cancelled) { if (total_unstarted_ranges != num_unstarted_scan_ranges_.Load()) { LOG(WARNING) << "total_unstarted_ranges=" << total_unstarted_ranges << " sum_in_states=" << num_unstarted_scan_ranges_.Load();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-scan-range.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc index 01ea4b5..25399bc 100644 --- a/be/src/runtime/disk-io-mgr-scan-range.cc +++ b/be/src/runtime/disk-io-mgr-scan-range.cc @@ -121,7 +121,7 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) { } DCHECK(reader_->Validate()) << endl << reader_->DebugString(); - if (reader_->state_ == RequestContext::Cancelled) { + if (reader_->state_ == DiskIoRequestContext::Cancelled) { reader_->blocked_ranges_.Remove(this); Cancel(reader_->status_); (*buffer)->Return(); @@ -230,7 +230,7 @@ void DiskIoMgr::ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64 mtime_ = mtime; } -void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) { +void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* reader) { DCHECK(hdfs_file_ == NULL); io_mgr_ = io_mgr; reader_ = reader; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-stress.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-stress.cc b/be/src/runtime/disk-io-mgr-stress.cc index af6ad27..0a8a628 100644 --- a/be/src/runtime/disk-io-mgr-stress.cc +++ b/be/src/runtime/disk-io-mgr-stress.cc @@ -52,7 +52,7 @@ string GenerateRandomData() { struct DiskIoMgrStress::Client { boost::mutex lock; - DiskIoMgr::RequestContext* reader; + DiskIoRequestContext* reader; int file_idx; vector<DiskIoMgr::ScanRange*> scan_ranges; int abort_at_byte; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc index ee89f56..46149b5 100644 --- a/be/src/runtime/disk-io-mgr-test.cc +++ b/be/src/runtime/disk-io-mgr-test.cc @@ -40,7 +40,7 @@ namespace impala { class DiskIoMgrTest : public testing::Test { public: void WriteValidateCallback(int num_writes, DiskIoMgr::WriteRange** written_range, - DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader, int32_t* data, + DiskIoMgr* io_mgr, DiskIoRequestContext* reader, int32_t* data, Status expected_status, const Status& status) { if (expected_status.code() == TErrorCode::CANCELLED) { EXPECT_TRUE(status.ok() || status.IsCancelled()) << "Error: " << status.GetDetail(); @@ -99,7 +99,7 @@ class DiskIoMgrTest : public testing::Test { } } - static void ValidateSyncRead(DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader, + static void ValidateSyncRead(DiskIoMgr* io_mgr, DiskIoRequestContext* reader, DiskIoMgr::ScanRange* range, const char* expected, int expected_len = -1) { DiskIoMgr::BufferDescriptor* buffer; ASSERT_OK(io_mgr->Read(reader, range, &buffer)); @@ -134,7 +134,7 @@ class DiskIoMgrTest : public testing::Test { // Continues pulling scan ranges from the io mgr until they are all done. // Updates num_ranges_processed with the number of ranges seen by this thread. - static void ScanRangeThread(DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader, + static void ScanRangeThread(DiskIoMgr* io_mgr, DiskIoRequestContext* reader, const char* expected_result, int expected_len, const Status& expected_status, int max_ranges, AtomicInt32* num_ranges_processed) { int num_ranges = 0; @@ -185,14 +185,14 @@ TEST_F(DiskIoMgrTest, SingleWriter) { scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10)); MemTracker reader_mem_tracker(LARGE_MEM_LIMIT); ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker)); - DiskIoMgr::RequestContext* reader; + DiskIoRequestContext* reader; ASSERT_OK(read_io_mgr->RegisterContext(&reader, &reader_mem_tracker)); for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { pool_.reset(new ObjectPool); DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10); ASSERT_OK(io_mgr.Init(&mem_tracker)); - DiskIoMgr::RequestContext* writer; + DiskIoRequestContext* writer; io_mgr.RegisterContext(&writer, &mem_tracker); for (int i = 0; i < num_ranges; ++i) { int32_t* data = pool_->Add(new int32_t); @@ -228,7 +228,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) { string tmp_file = "/tmp/non-existent.txt"; DiskIoMgr io_mgr(1, 1, 1, 10); ASSERT_OK(io_mgr.Init(&mem_tracker)); - DiskIoMgr::RequestContext* writer; + DiskIoRequestContext* writer; ASSERT_OK(io_mgr.RegisterContext(&writer)); pool_.reset(new ObjectPool); int32_t* data = pool_->Add(new int32_t); @@ -238,7 +238,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) { DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*); DiskIoMgr::WriteRange::WriteDoneCallback callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, - new_range, (DiskIoMgr*)NULL, (DiskIoMgr::RequestContext*)NULL, + new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1); *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback)); @@ -255,7 +255,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) { new_range = pool_->Add(new DiskIoMgr::WriteRange*); callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, - new_range, (DiskIoMgr*)NULL, (DiskIoMgr::RequestContext*)NULL, + new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1); *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback)); @@ -291,14 +291,14 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) { scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10)); MemTracker reader_mem_tracker(LARGE_MEM_LIMIT); ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker)); - DiskIoMgr::RequestContext* reader; + DiskIoRequestContext* reader; ASSERT_OK(read_io_mgr->RegisterContext(&reader, &reader_mem_tracker)); for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) { for (int num_disks = 1; num_disks <= 5; num_disks += 2) { pool_.reset(new ObjectPool); DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10); ASSERT_OK(io_mgr.Init(&mem_tracker)); - DiskIoMgr::RequestContext* writer; + DiskIoRequestContext* writer; io_mgr.RegisterContext(&writer, &mem_tracker); Status validate_status = Status::OK(); for (int i = 0; i < num_ranges; ++i) { @@ -362,7 +362,7 @@ TEST_F(DiskIoMgrTest, SingleReader) { ASSERT_OK(io_mgr.Init(&mem_tracker)); MemTracker reader_mem_tracker; - DiskIoMgr::RequestContext* reader; + DiskIoRequestContext* reader; ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker)); vector<DiskIoMgr::ScanRange*> ranges; @@ -416,7 +416,7 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) { ASSERT_OK(io_mgr.Init(&mem_tracker)); MemTracker reader_mem_tracker; - DiskIoMgr::RequestContext* reader; + DiskIoRequestContext* reader; ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker)); vector<DiskIoMgr::ScanRange*> ranges_first_half; @@ -489,7 +489,7 @@ TEST_F(DiskIoMgrTest, SyncReadTest) { ASSERT_OK(io_mgr.Init(&mem_tracker)); MemTracker reader_mem_tracker; - DiskIoMgr::RequestContext* reader; + DiskIoRequestContext* reader; ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker)); DiskIoMgr::ScanRange* complete_range = InitRange(1, tmp_file, 0, strlen(data), 0, @@ -559,7 +559,7 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) { ASSERT_OK(io_mgr.Init(&mem_tracker)); MemTracker reader_mem_tracker; - DiskIoMgr::RequestContext* reader; + DiskIoRequestContext* reader; ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker)); vector<DiskIoMgr::ScanRange*> ranges; @@ -624,7 +624,7 @@ TEST_F(DiskIoMgrTest, MemLimits) { ASSERT_OK(io_mgr.Init(&mem_tracker)); MemTracker reader_mem_tracker; - DiskIoMgr::RequestContext* reader; + DiskIoRequestContext* reader; ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker)); vector<DiskIoMgr::ScanRange*> ranges; @@ -699,7 +699,7 @@ TEST_F(DiskIoMgrTest, CachedReads) { ASSERT_OK(io_mgr.Init(&mem_tracker)); MemTracker reader_mem_tracker; - DiskIoMgr::RequestContext* reader; + DiskIoRequestContext* reader; ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker)); DiskIoMgr::ScanRange* complete_range = @@ -764,7 +764,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) { stat(file_name.c_str(), &stat_val); int64_t iters = 0; - vector<DiskIoMgr::RequestContext*> contexts(num_contexts); + vector<DiskIoRequestContext*> contexts(num_contexts); Status status; for (int iteration = 0; iteration < ITERATIONS; ++iteration) { for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) { @@ -838,7 +838,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) { vector<string> file_names; vector<int64_t> mtimes; vector<string> data; - vector<DiskIoMgr::RequestContext*> readers; + vector<DiskIoRequestContext*> readers; vector<char*> results; file_names.resize(NUM_READERS); @@ -1000,7 +1000,7 @@ TEST_F(DiskIoMgrTest, PartialRead) { ASSERT_OK(io_mgr->Init(&mem_tracker)); MemTracker reader_mem_tracker; - DiskIoMgr::RequestContext* reader; + DiskIoRequestContext* reader; ASSERT_OK(io_mgr->RegisterContext(&reader, &reader_mem_tracker)); // We should not read past the end of file. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 66f0498..448424f 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -104,33 +104,34 @@ DiskIoMgr::HdfsCachedFileHandle::~HdfsCachedFileHandle() { hdfs_file_ = NULL; } -// This class provides a cache of RequestContext objects. RequestContexts are recycled. -// This is good for locality as well as lock contention. The cache has the property that -// regardless of how many clients get added/removed, the memory locations for -// existing clients do not change (not the case with std::vector) minimizing the locks we -// have to take across all readers. +// This class provides a cache of DiskIoRequestContext objects. DiskIoRequestContexts +// are recycled. This is good for locality as well as lock contention. The cache has +// the property that regardless of how many clients get added/removed, the memory +// locations for existing clients do not change (not the case with std::vector) +// minimizing the locks we have to take across all readers. // All functions on this object are thread safe class DiskIoMgr::RequestContextCache { public: RequestContextCache(DiskIoMgr* io_mgr) : io_mgr_(io_mgr) {} // Returns a context to the cache. This object can now be reused. - void ReturnContext(RequestContext* reader) { - DCHECK(reader->state_ != RequestContext::Inactive); - reader->state_ = RequestContext::Inactive; + void ReturnContext(DiskIoRequestContext* reader) { + DCHECK(reader->state_ != DiskIoRequestContext::Inactive); + reader->state_ = DiskIoRequestContext::Inactive; lock_guard<mutex> l(lock_); inactive_contexts_.push_back(reader); } - // Returns a new RequestContext object. Allocates a new object if necessary. - RequestContext* GetNewContext() { + // Returns a new DiskIoRequestContext object. Allocates a new object if necessary. + DiskIoRequestContext* GetNewContext() { lock_guard<mutex> l(lock_); if (!inactive_contexts_.empty()) { - RequestContext* reader = inactive_contexts_.front(); + DiskIoRequestContext* reader = inactive_contexts_.front(); inactive_contexts_.pop_front(); return reader; } else { - RequestContext* reader = new RequestContext(io_mgr_, io_mgr_->num_total_disks()); + DiskIoRequestContext* reader = + new DiskIoRequestContext(io_mgr_, io_mgr_->num_total_disks()); all_contexts_.push_back(reader); return reader; } @@ -138,7 +139,7 @@ class DiskIoMgr::RequestContextCache { // This object has the same lifetime as the disk IoMgr. ~RequestContextCache() { - for (list<RequestContext*>::iterator it = all_contexts_.begin(); + for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin(); it != all_contexts_.end(); ++it) { delete *it; } @@ -147,9 +148,9 @@ class DiskIoMgr::RequestContextCache { // Validates that all readers are cleaned up and in the inactive state. No locks // are taken since this is only called from the disk IoMgr destructor. bool ValidateAllInactive() { - for (list<RequestContext*>::iterator it = all_contexts_.begin(); + for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin(); it != all_contexts_.end(); ++it) { - if ((*it)->state_ != RequestContext::Inactive) { + if ((*it)->state_ != DiskIoRequestContext::Inactive) { return false; } } @@ -166,16 +167,16 @@ class DiskIoMgr::RequestContextCache { mutex lock_; // List of all request contexts created. Used for debugging - list<RequestContext*> all_contexts_; + list<DiskIoRequestContext*> all_contexts_; // List of inactive readers. These objects can be used for a new reader. - list<RequestContext*> inactive_contexts_; + list<DiskIoRequestContext*> inactive_contexts_; }; string DiskIoMgr::RequestContextCache::DebugString() { lock_guard<mutex> l(lock_); stringstream ss; - for (list<RequestContext*>::iterator it = all_contexts_.begin(); + for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin(); it != all_contexts_.end(); ++it) { unique_lock<mutex> lock((*it)->lock_); ss << (*it)->DebugString() << endl; @@ -193,7 +194,7 @@ string DiskIoMgr::DebugString() { ss << " " << (void*) disk_queues_[i] << ":" ; if (!disk_queues_[i]->request_contexts.empty()) { ss << " Readers: "; - for (RequestContext* req_context: disk_queues_[i]->request_contexts) { + for (DiskIoRequestContext* req_context: disk_queues_[i]->request_contexts) { ss << (void*)req_context; } } @@ -206,7 +207,7 @@ DiskIoMgr::BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr) : io_mgr_(io_mgr), reader_(NULL), buffer_(NULL) { } -void DiskIoMgr::BufferDescriptor::Reset(RequestContext* reader, +void DiskIoMgr::BufferDescriptor::Reset(DiskIoRequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_len) { DCHECK(io_mgr_ != NULL); DCHECK(buffer_ == NULL); @@ -314,7 +315,7 @@ DiskIoMgr::~DiskIoMgr() { for (int i = 0; i < disk_queues_.size(); ++i) { if (disk_queues_[i] == NULL) continue; int disk_id = disk_queues_[i]->disk_id; - for (list<RequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin(); + for (list<DiskIoRequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin(); it != disk_queues_[i]->request_contexts.end(); ++it) { DCHECK_EQ((*it)->disk_states_[disk_id].num_threads_in_op(), 0); DCHECK((*it)->disk_states_[disk_id].done()); @@ -384,7 +385,7 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) { return Status::OK(); } -Status DiskIoMgr::RegisterContext(RequestContext** request_context, +Status DiskIoMgr::RegisterContext(DiskIoRequestContext** request_context, MemTracker* mem_tracker) { DCHECK(request_context_cache_.get() != NULL) << "Must call Init() first."; *request_context = request_context_cache_->GetNewContext(); @@ -392,7 +393,7 @@ Status DiskIoMgr::RegisterContext(RequestContext** request_context, return Status::OK(); } -void DiskIoMgr::UnregisterContext(RequestContext* reader) { +void DiskIoMgr::UnregisterContext(DiskIoRequestContext* reader) { // Blocking cancel (waiting for disks completion). CancelContext(reader, true); @@ -425,7 +426,7 @@ void DiskIoMgr::UnregisterContext(RequestContext* reader) { // outstanding reference to the context decrements the number of disk queues the context // is on. // If wait_for_disks_completion is true, wait for the number of active disks to become 0. -void DiskIoMgr::CancelContext(RequestContext* context, bool wait_for_disks_completion) { +void DiskIoMgr::CancelContext(DiskIoRequestContext* context, bool wait_for_disks_completion) { context->Cancel(Status::CANCELLED); if (wait_for_disks_completion) { @@ -437,50 +438,50 @@ void DiskIoMgr::CancelContext(RequestContext* context, bool wait_for_disks_compl } } -void DiskIoMgr::set_read_timer(RequestContext* r, RuntimeProfile::Counter* c) { +void DiskIoMgr::set_read_timer(DiskIoRequestContext* r, RuntimeProfile::Counter* c) { r->read_timer_ = c; } -void DiskIoMgr::set_bytes_read_counter(RequestContext* r, RuntimeProfile::Counter* c) { +void DiskIoMgr::set_bytes_read_counter(DiskIoRequestContext* r, RuntimeProfile::Counter* c) { r->bytes_read_counter_ = c; } -void DiskIoMgr::set_active_read_thread_counter(RequestContext* r, +void DiskIoMgr::set_active_read_thread_counter(DiskIoRequestContext* r, RuntimeProfile::Counter* c) { r->active_read_thread_counter_ = c; } -void DiskIoMgr::set_disks_access_bitmap(RequestContext* r, +void DiskIoMgr::set_disks_access_bitmap(DiskIoRequestContext* r, RuntimeProfile::Counter* c) { r->disks_accessed_bitmap_ = c; } -int64_t DiskIoMgr::queue_size(RequestContext* reader) const { +int64_t DiskIoMgr::queue_size(DiskIoRequestContext* reader) const { return reader->num_ready_buffers_.Load(); } -Status DiskIoMgr::context_status(RequestContext* context) const { +Status DiskIoMgr::context_status(DiskIoRequestContext* context) const { unique_lock<mutex> lock(context->lock_); return context->status_; } -int64_t DiskIoMgr::bytes_read_local(RequestContext* reader) const { +int64_t DiskIoMgr::bytes_read_local(DiskIoRequestContext* reader) const { return reader->bytes_read_local_.Load(); } -int64_t DiskIoMgr::bytes_read_short_circuit(RequestContext* reader) const { +int64_t DiskIoMgr::bytes_read_short_circuit(DiskIoRequestContext* reader) const { return reader->bytes_read_short_circuit_.Load(); } -int64_t DiskIoMgr::bytes_read_dn_cache(RequestContext* reader) const { +int64_t DiskIoMgr::bytes_read_dn_cache(DiskIoRequestContext* reader) const { return reader->bytes_read_dn_cache_.Load(); } -int DiskIoMgr::num_remote_ranges(RequestContext* reader) const { +int DiskIoMgr::num_remote_ranges(DiskIoRequestContext* reader) const { return reader->num_remote_ranges_.Load(); } -int64_t DiskIoMgr::unexpected_remote_bytes(RequestContext* reader) const { +int64_t DiskIoMgr::unexpected_remote_bytes(DiskIoRequestContext* reader) const { return reader->unexpected_remote_bytes_.Load(); } @@ -499,7 +500,7 @@ Status DiskIoMgr::ValidateScanRange(ScanRange* range) { return Status::OK(); } -Status DiskIoMgr::AddScanRanges(RequestContext* reader, +Status DiskIoMgr::AddScanRanges(DiskIoRequestContext* reader, const vector<ScanRange*>& ranges, bool schedule_immediately) { if (ranges.empty()) return Status::OK(); @@ -513,7 +514,7 @@ Status DiskIoMgr::AddScanRanges(RequestContext* reader, unique_lock<mutex> reader_lock(reader->lock_); DCHECK(reader->Validate()) << endl << reader->DebugString(); - if (reader->state_ == RequestContext::Cancelled) { + if (reader->state_ == DiskIoRequestContext::Cancelled) { DCHECK(!reader->status_.ok()); return reader->status_; } @@ -545,7 +546,7 @@ Status DiskIoMgr::AddScanRanges(RequestContext* reader, // This function returns the next scan range the reader should work on, checking // for eos and error cases. If there isn't already a cached scan range or a scan // range prepared by the disk threads, the caller waits on the disk threads. -Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) { +Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range) { DCHECK(reader != NULL); DCHECK(range != NULL); *range = NULL; @@ -555,7 +556,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) { DCHECK(reader->Validate()) << endl << reader->DebugString(); while (true) { - if (reader->state_ == RequestContext::Cancelled) { + if (reader->state_ == DiskIoRequestContext::Cancelled) { DCHECK(!reader->status_.ok()); status = reader->status_; break; @@ -599,7 +600,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) { return status; } -Status DiskIoMgr::Read(RequestContext* reader, +Status DiskIoMgr::Read(DiskIoRequestContext* reader, ScanRange* range, BufferDescriptor** buffer) { DCHECK(range != NULL); DCHECK(buffer != NULL); @@ -623,7 +624,7 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) { DCHECK(buffer_desc != NULL); if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == NULL); - RequestContext* reader = buffer_desc->reader_; + DiskIoRequestContext* reader = buffer_desc->reader_; if (buffer_desc->buffer_ != NULL) { if (buffer_desc->scan_range_->cached_buffer_ == NULL) { // Not a cached buffer. Return the io buffer and update mem tracking. @@ -655,7 +656,7 @@ void DiskIoMgr::ReturnBufferDesc(BufferDescriptor* desc) { } DiskIoMgr::BufferDescriptor* DiskIoMgr::GetBufferDesc( - RequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size) { + DiskIoRequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size) { BufferDescriptor* buffer_desc; { unique_lock<mutex> lock(free_buffers_lock_); @@ -771,18 +772,18 @@ void DiskIoMgr::ReturnFreeBuffer(char* buffer, int64_t buffer_size) { // b) Adds an unstarted write range to in_flight_ranges_. The write range is processed // immediately if there are no preceding scan ranges in in_flight_ranges_ // It blocks until work is available or the thread is shut down. -// Work is available if there is a RequestContext with +// Work is available if there is a DiskIoRequestContext with // - A ScanRange with a buffer available, or // - A WriteRange in unstarted_write_ranges_. bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, - RequestContext** request_context) { + DiskIoRequestContext** request_context) { int disk_id = disk_queue->disk_id; *range = NULL; // This loops returns either with work to do or when the disk IoMgr shuts down. while (true) { *request_context = NULL; - RequestContext::PerDiskState* request_disk_state = NULL; + DiskIoRequestContext::PerDiskState* request_disk_state = NULL; { unique_lock<mutex> disk_lock(disk_queue->lock); @@ -827,12 +828,12 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, << (*request_context)->DebugString(); // Check if reader has been cancelled - if ((*request_context)->state_ == RequestContext::Cancelled) { + if ((*request_context)->state_ == DiskIoRequestContext::Cancelled) { request_disk_state->DecrementRequestThreadAndCheckDone(*request_context); continue; } - DCHECK_EQ((*request_context)->state_, RequestContext::Active) + DCHECK_EQ((*request_context)->state_, DiskIoRequestContext::Active) << (*request_context)->DebugString(); if (request_disk_state->next_scan_range_to_start() == NULL && @@ -889,7 +890,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, return false; } -void DiskIoMgr::HandleWriteFinished(RequestContext* writer, WriteRange* write_range, +void DiskIoMgr::HandleWriteFinished(DiskIoRequestContext* writer, WriteRange* write_range, const Status& write_status) { // Execute the callback before decrementing the thread count. Otherwise CancelContext() // that waits for the disk ref count to be 0 will return, creating a race, e.g. @@ -900,8 +901,8 @@ void DiskIoMgr::HandleWriteFinished(RequestContext* writer, WriteRange* write_ra { unique_lock<mutex> writer_lock(writer->lock_); DCHECK(writer->Validate()) << endl << writer->DebugString(); - RequestContext::PerDiskState& state = writer->disk_states_[write_range->disk_id_]; - if (writer->state_ == RequestContext::Cancelled) { + DiskIoRequestContext::PerDiskState& state = writer->disk_states_[write_range->disk_id_]; + if (writer->state_ == DiskIoRequestContext::Cancelled) { state.DecrementRequestThreadAndCheckDone(writer); } else { state.DecrementRequestThread(); @@ -910,16 +911,16 @@ void DiskIoMgr::HandleWriteFinished(RequestContext* writer, WriteRange* write_ra } } -void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader, +void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* reader, BufferDescriptor* buffer) { unique_lock<mutex> reader_lock(reader->lock_); - RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id]; + DiskIoRequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id]; DCHECK(reader->Validate()) << endl << reader->DebugString(); DCHECK_GT(state.num_threads_in_op(), 0); DCHECK(buffer->buffer_ != NULL); - if (reader->state_ == RequestContext::Cancelled) { + if (reader->state_ == DiskIoRequestContext::Cancelled) { state.DecrementRequestThreadAndCheckDone(reader); DCHECK(reader->Validate()) << endl << reader->DebugString(); ReturnFreeBuffer(buffer); @@ -930,7 +931,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader return; } - DCHECK_EQ(reader->state_, RequestContext::Active); + DCHECK_EQ(reader->state_, DiskIoRequestContext::Active); DCHECK(buffer->buffer_ != NULL); // Update the reader's scan ranges. There are a three cases here: @@ -979,7 +980,7 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) { // 3. Perform the read or write as specified. // Cancellation checking needs to happen in both steps 1 and 3. while (true) { - RequestContext* worker_context = NULL;; + DiskIoRequestContext* worker_context = NULL;; RequestRange* range = NULL; if (!GetNextRequestRange(disk_queue, &range, &worker_context)) { @@ -1000,7 +1001,7 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) { // This function reads the specified scan range associated with the // specified reader context and disk queue. -void DiskIoMgr::ReadRange(DiskQueue* disk_queue, RequestContext* reader, +void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader, ScanRange* range) { char* buffer = NULL; int64_t bytes_remaining = range->len_ - range->bytes_read_; @@ -1017,11 +1018,11 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, RequestContext* reader, } if (!enough_memory) { - RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id]; + DiskIoRequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id]; unique_lock<mutex> reader_lock(reader->lock_); // Just grabbed the reader lock, check for cancellation. - if (reader->state_ == RequestContext::Cancelled) { + if (reader->state_ == DiskIoRequestContext::Cancelled) { DCHECK(reader->Validate()) << endl << reader->DebugString(); state.DecrementRequestThreadAndCheckDone(reader); range->Cancel(reader->status_); @@ -1087,7 +1088,7 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, RequestContext* reader, HandleReadFinished(disk_queue, reader, buffer_desc); } -void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) { +void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* write_range) { FILE* file_handle = fopen(write_range->file(), "rb+"); Status ret_status; if (file_handle == NULL) { @@ -1137,11 +1138,11 @@ int DiskIoMgr::free_buffers_idx(int64_t buffer_size) { return idx; } -Status DiskIoMgr::AddWriteRange(RequestContext* writer, WriteRange* write_range) { +Status DiskIoMgr::AddWriteRange(DiskIoRequestContext* writer, WriteRange* write_range) { DCHECK_LE(write_range->len(), max_buffer_size_); unique_lock<mutex> writer_lock(writer->lock_); - if (writer->state_ == RequestContext::Cancelled) { + if (writer->state_ == DiskIoRequestContext::Cancelled) { DCHECK(!writer->status_.ok()); return writer->status_; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h index b130d52..79902c5 100644 --- a/be/src/runtime/disk-io-mgr.h +++ b/be/src/runtime/disk-io-mgr.h @@ -23,7 +23,6 @@ #include <boost/unordered_set.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition_variable.hpp> -#include <boost/thread/thread.hpp> #include "common/atomic.h" #include "common/hdfs.h" @@ -42,7 +41,7 @@ namespace impala { class MemTracker; /// Manager object that schedules IO for all queries on all disks and remote filesystems -/// (such as S3). Each query maps to one or more RequestContext objects, each of which +/// (such as S3). Each query maps to one or more DiskIoRequestContext objects, each of which /// has its own queue of scan ranges and/or write ranges. // /// The API splits up requesting scan/write ranges (non-blocking) and reading the data @@ -185,12 +184,14 @@ class MemTracker; /// - Internal classes are defined in disk-io-mgr-internal.h /// - ScanRange APIs are implemented in disk-io-mgr-scan-range.cc /// This contains the ready buffer queue logic -/// - RequestContext APIs are implemented in disk-io-mgr-reader-context.cc +/// - DiskIoRequestContext APIs are implemented in disk-io-mgr-reader-context.cc /// This contains the logic for picking scan ranges for a reader. /// - Disk Thread and general APIs are implemented in disk-io-mgr.cc. + +class DiskIoRequestContext; + class DiskIoMgr { public: - class RequestContext; class ScanRange; /// This class is a small wrapper around the hdfsFile handle and the file system @@ -247,16 +248,17 @@ class DiskIoMgr { private: friend class DiskIoMgr; + friend class DiskIoRequestContext; BufferDescriptor(DiskIoMgr* io_mgr); /// Resets the buffer descriptor state for a new reader, range and data buffer. - void Reset(RequestContext* reader, ScanRange* range, char* buffer, + void Reset(DiskIoRequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_len); DiskIoMgr* io_mgr_; /// Reader that this buffer is for - RequestContext* reader_; + DiskIoRequestContext* reader_; /// The current tracker this buffer is associated with. MemTracker* mem_tracker_; @@ -367,9 +369,10 @@ class DiskIoMgr { private: friend class DiskIoMgr; + friend class DiskIoRequestContext; /// Initialize internal fields - void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader); + void InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* reader); /// Enqueues a buffer for this range. This does not block. /// Returns true if this scan range has hit the queue capacity, false otherwise. @@ -423,7 +426,7 @@ class DiskIoMgr { DiskIoMgr* io_mgr_; /// Reader/owner of the scan range - RequestContext* reader_; + DiskIoRequestContext* reader_; /// File handle either to hdfs or local fs (FILE*) /// @@ -446,7 +449,7 @@ class DiskIoMgr { int bytes_read_; /// Status for this range. This is non-ok if is_cancelled_ is true. - /// Note: an individual range can fail without the RequestContext being + /// Note: an individual range can fail without the DiskIoRequestContext being /// cancelled. This allows us to skip individual ranges. Status status_; @@ -509,6 +512,7 @@ class DiskIoMgr { private: friend class DiskIoMgr; + friend class DiskIoRequestContext; /// Data to be written. RequestRange::len_ contains the length of data /// to be written. @@ -540,13 +544,13 @@ class DiskIoMgr { /// Allocates tracking structure for a request context. /// Register a new request context which is returned in *request_context. - /// The IoMgr owns the allocated RequestContext object. The caller must call + /// The IoMgr owns the allocated DiskIoRequestContext object. The caller must call /// UnregisterContext() for each context. /// reader_mem_tracker: Is non-null only for readers. IO buffers /// used for this reader will be tracked by this. If the limit is exceeded /// the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via /// GetNext(). - Status RegisterContext(RequestContext** request_context, + Status RegisterContext(DiskIoRequestContext** request_context, MemTracker* reader_mem_tracker = NULL); /// Unregisters context from the disk IoMgr. This must be called for every @@ -555,7 +559,7 @@ class DiskIoMgr { /// The 'context' cannot be used after this call. /// This call blocks until all the disk threads have finished cleaning up. /// UnregisterContext also cancels the reader/writer from the disk IoMgr. - void UnregisterContext(RequestContext* context); + void UnregisterContext(DiskIoRequestContext* context); /// This function cancels the context asychronously. All outstanding requests /// are aborted and tracking structures cleaned up. This does not need to be @@ -565,7 +569,7 @@ class DiskIoMgr { /// context to reach 0. After calling with wait_for_disks_completion = true, the only /// valid API is returning IO buffers that have already been returned. /// Takes context->lock_ if wait_for_disks_completion is true. - void CancelContext(RequestContext* context, bool wait_for_disks_completion = false); + void CancelContext(DiskIoRequestContext* context, bool wait_for_disks_completion = false); /// Adds the scan ranges to the queues. This call is non-blocking. The caller must /// not deallocate the scan range pointers before UnregisterContext(). @@ -573,26 +577,26 @@ class DiskIoMgr { /// (i.e. the caller should not/cannot call GetNextRange for these ranges). /// This can be used to do synchronous reads as well as schedule dependent ranges, /// as in the case for columnar formats. - Status AddScanRanges(RequestContext* reader, const std::vector<ScanRange*>& ranges, + Status AddScanRanges(DiskIoRequestContext* reader, const std::vector<ScanRange*>& ranges, bool schedule_immediately = false); /// Add a WriteRange for the writer. This is non-blocking and schedules the context /// on the IoMgr disk queue. Does not create any files. - Status AddWriteRange(RequestContext* writer, WriteRange* write_range); + Status AddWriteRange(DiskIoRequestContext* writer, WriteRange* write_range); /// Returns the next unstarted scan range for this reader. When the range is returned, /// the disk threads in the IoMgr will already have started reading from it. The /// caller is expected to call ScanRange::GetNext on the returned range. /// If there are no more unstarted ranges, NULL is returned. /// This call is blocking. - Status GetNextRange(RequestContext* reader, ScanRange** range); + Status GetNextRange(DiskIoRequestContext* reader, ScanRange** range); /// Reads the range and returns the result in buffer. /// This behaves like the typical synchronous read() api, blocking until the data /// is read. This can be called while there are outstanding ScanRanges and is /// thread safe. Multiple threads can be calling Read() per reader at a time. /// range *cannot* have already been added via AddScanRanges. - Status Read(RequestContext* reader, ScanRange* range, BufferDescriptor** buffer); + Status Read(DiskIoRequestContext* reader, ScanRange* range, BufferDescriptor** buffer); /// Determine which disk queue this file should be assigned to. Returns an index into /// disk_queues_. The disk_id is the volume ID for the local disk that holds the @@ -600,21 +604,21 @@ class DiskIoMgr { /// co-located with the datanode for this file. int AssignQueue(const char* file, int disk_id, bool expected_local); - /// TODO: The functions below can be moved to RequestContext. + /// TODO: The functions below can be moved to DiskIoRequestContext. /// Returns the current status of the context. - Status context_status(RequestContext* context) const; + Status context_status(DiskIoRequestContext* context) const; - void set_bytes_read_counter(RequestContext*, RuntimeProfile::Counter*); - void set_read_timer(RequestContext*, RuntimeProfile::Counter*); - void set_active_read_thread_counter(RequestContext*, RuntimeProfile::Counter*); - void set_disks_access_bitmap(RequestContext*, RuntimeProfile::Counter*); + void set_bytes_read_counter(DiskIoRequestContext*, RuntimeProfile::Counter*); + void set_read_timer(DiskIoRequestContext*, RuntimeProfile::Counter*); + void set_active_read_thread_counter(DiskIoRequestContext*, RuntimeProfile::Counter*); + void set_disks_access_bitmap(DiskIoRequestContext*, RuntimeProfile::Counter*); - int64_t queue_size(RequestContext* reader) const; - int64_t bytes_read_local(RequestContext* reader) const; - int64_t bytes_read_short_circuit(RequestContext* reader) const; - int64_t bytes_read_dn_cache(RequestContext* reader) const; - int num_remote_ranges(RequestContext* reader) const; - int64_t unexpected_remote_bytes(RequestContext* reader) const; + int64_t queue_size(DiskIoRequestContext* reader) const; + int64_t bytes_read_local(DiskIoRequestContext* reader) const; + int64_t bytes_read_short_circuit(DiskIoRequestContext* reader) const; + int64_t bytes_read_dn_cache(DiskIoRequestContext* reader) const; + int num_remote_ranges(DiskIoRequestContext* reader) const; + int64_t unexpected_remote_bytes(DiskIoRequestContext* reader) const; /// Returns the read throughput across all readers. /// TODO: should this be a sliding window? This should report metrics for the @@ -671,6 +675,7 @@ class DiskIoMgr { private: friend class BufferDescriptor; + friend class DiskIoRequestContext; struct DiskQueue; class RequestContextCache; @@ -757,7 +762,7 @@ class DiskIoMgr { /// should be <= max_buffer_size_. These constraints will be met if buffer was acquired /// via GetFreeBuffer() (which it should have been). BufferDescriptor* GetBufferDesc( - RequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size); + DiskIoRequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size); /// Returns a buffer desc object which can now be used for another reader. void ReturnBufferDesc(BufferDescriptor* desc); @@ -798,11 +803,11 @@ class DiskIoMgr { /// Only returns false if the disk thread should be shut down. /// No locks should be taken before this function call and none are left taken after. bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range, - RequestContext** request_context); + DiskIoRequestContext** request_context); /// Updates disk queue and reader state after a read is complete. The read result /// is captured in the buffer descriptor. - void HandleReadFinished(DiskQueue*, RequestContext*, BufferDescriptor*); + void HandleReadFinished(DiskQueue*, DiskIoRequestContext*, BufferDescriptor*); /// Invokes write_range->callback_ after the range has been written and /// updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR @@ -810,7 +815,7 @@ class DiskIoMgr { /// The write_status does not affect the writer->status_. That is, an write error does /// not cancel the writer context - that decision is left to the callback handler. /// TODO: On the read path, consider not canceling the reader context on error. - void HandleWriteFinished(RequestContext* writer, WriteRange* write_range, + void HandleWriteFinished(DiskIoRequestContext* writer, WriteRange* write_range, const Status& write_status); /// Validates that range is correctly initialized @@ -818,7 +823,7 @@ class DiskIoMgr { /// Write the specified range to disk and calls HandleWriteFinished when done. /// Responsible for opening and closing the file that is written. - void Write(RequestContext* writer_context, WriteRange* write_range); + void Write(DiskIoRequestContext* writer_context, WriteRange* write_range); /// Helper method to write a range using the specified FILE handle. Returns Status:OK /// if the write succeeded, or a RUNTIME_ERROR with an appropriate message otherwise. @@ -826,7 +831,7 @@ class DiskIoMgr { Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range); /// Reads the specified scan range and calls HandleReadFinished when done. - void ReadRange(DiskQueue* disk_queue, RequestContext* reader, + void ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader, ScanRange* range); }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 07378e4..7eaa3db 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -22,6 +22,7 @@ #include "common/logging.h" #include "resourcebroker/resource-broker.h" +#include "runtime/backend-client.h" #include "runtime/client-cache.h" #include "runtime/coordinator.h" #include "runtime/data-stream-mgr.h" @@ -38,6 +39,7 @@ #include "statestore/statestore-subscriber.h" #include "util/debug-util.h" #include "util/default-path-handlers.h" +#include "util/hdfs-bulk-ops.h" #include "util/mem-info.h" #include "util/metrics.h" #include "util/network-util.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/exec-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index e405bf8..c5404dc 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -17,35 +17,35 @@ #define IMPALA_RUNTIME_EXEC_ENV_H #include <boost/scoped_ptr.hpp> -#include <boost/shared_ptr.hpp> -#include <boost/thread/thread.hpp> +// NOTE: try not to add more headers here: exec-env.h is included in many many files. #include "common/status.h" -#include "runtime/backend-client.h" -#include "util/cgroups-mgr.h" -#include "util/hdfs-bulk-ops.h" // For declaration of HdfsOpThreadPool -#include "resourcebroker/resource-broker.h" +#include "runtime/client-cache-types.h" +#include "util/hdfs-bulk-ops-defs.h" // For declaration of HdfsOpThreadPool namespace impala { +class CallableThreadPool; +class CgroupsMgr; class DataStreamMgr; class DiskIoMgr; +class FragmentMgr; +class Frontend; class HBaseTableFactory; class HdfsFsCache; +class ImpalaServer; class LibCache; +class MemTracker; +class MetricGroup; +class QueryResourceMgr; +class RequestPoolService; +class ResourceBroker; class Scheduler; class StatestoreSubscriber; class TestExecEnv; -class Webserver; -class MetricGroup; -class MemTracker; class ThreadResourceMgr; -class CgroupsManager; -class ImpalaServer; -class RequestPoolService; -class FragmentMgr; -class Frontend; class TmpFileMgr; +class Webserver; /// Execution environment for queries/plan fragments. /// Contains all required global structures, and handles to http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/lib-cache.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc index 5c1e423..b3a6f0a 100644 --- a/be/src/runtime/lib-cache.cc +++ b/be/src/runtime/lib-cache.cc @@ -29,14 +29,15 @@ #include "common/names.h" namespace filesystem = boost::filesystem; -using namespace impala; DEFINE_string(local_library_dir, "/tmp", "Local directory to copy UDF libraries from HDFS into"); +namespace impala { + scoped_ptr<LibCache> LibCache::instance_; -struct LibCache::LibCacheEntry { +struct LibCacheEntry { // Lock protecting all fields in this entry boost::mutex lock; @@ -53,7 +54,7 @@ struct LibCache::LibCacheEntry { bool check_needs_refresh; // The type of this file. - LibType type; + LibCache::LibType type; // The path on the local file system for this library. std::string local_path; @@ -117,7 +118,7 @@ Status LibCache::InitInternal() { return Status::OK(); } -LibCache::LibCacheEntry::~LibCacheEntry() { +LibCacheEntry::~LibCacheEntry() { if (shared_object_handle != NULL) { DCHECK_EQ(use_count, 0); DCHECK(should_remove); @@ -418,3 +419,5 @@ string LibCache::MakeLocalPath(const string& hdfs_path, const string& local_dir) << (num_libs_copied_.Add(1) - 1) << src.extension().native(); return dst.str(); } + +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/lib-cache.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h index b36a859..9201341 100644 --- a/be/src/runtime/lib-cache.h +++ b/be/src/runtime/lib-cache.h @@ -50,10 +50,10 @@ class RuntimeState; /// TODO: /// - refresh libraries /// - better cached module management. +struct LibCacheEntry; + class LibCache { public: - struct LibCacheEntry; - enum LibType { TYPE_SO, // Shared object TYPE_IR, // IR intermediate http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/mem-pool.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-pool.h b/be/src/runtime/mem-pool.h index 9e22e9f..fdf38e9 100644 --- a/be/src/runtime/mem-pool.h +++ b/be/src/runtime/mem-pool.h @@ -23,7 +23,6 @@ #include "common/logging.h" #include "util/bit-util.h" -#include "util/runtime-profile.h" namespace impala { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/mem-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h index 9534671..73c9300 100644 --- a/be/src/runtime/mem-tracker.h +++ b/be/src/runtime/mem-tracker.h @@ -29,7 +29,7 @@ #include "util/debug-util.h" #include "util/internal-queue.h" #include "util/metrics.h" -#include "util/runtime-profile.h" +#include "util/runtime-profile-counters.h" #include "util/spinlock.h" #include "gen-cpp/Types_types.h" // for TUniqueId http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/plan-fragment-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc index 99ed75d..75c5f58 100644 --- a/be/src/runtime/plan-fragment-executor.cc +++ b/be/src/runtime/plan-fragment-executor.cc @@ -29,10 +29,13 @@ #include "exec/hdfs-scan-node.h" #include "exec/hbase-table-scanner.h" #include "exprs/expr.h" +#include "resourcebroker/resource-broker.h" #include "runtime/descriptors.h" #include "runtime/data-stream-mgr.h" #include "runtime/row-batch.h" +#include "runtime/runtime-filter-bank.h" #include "runtime/mem-tracker.h" +#include "scheduling/query-resource-mgr.h" #include "util/cgroups-mgr.h" #include "util/cpu-info.h" #include "util/debug-util.h" @@ -589,7 +592,7 @@ void PlanFragmentExecutor::Close() { runtime_state_->fragment_instance_id(), runtime_state_->cgroup()); } if (plan_ != NULL) plan_->Close(runtime_state_.get()); - for (DiskIoMgr::RequestContext* context: *runtime_state_->reader_contexts()) { + for (DiskIoRequestContext* context: *runtime_state_->reader_contexts()) { runtime_state_->io_mgr()->UnregisterContext(context); } exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/plan-fragment-executor.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h index 91cece6..29250a3 100644 --- a/be/src/runtime/plan-fragment-executor.h +++ b/be/src/runtime/plan-fragment-executor.h @@ -24,6 +24,7 @@ #include "common/status.h" #include "common/object-pool.h" #include "runtime/runtime-state.h" +#include "util/runtime-profile-counters.h" #include "util/thread.h" namespace impala { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/row-batch-serialize-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch-serialize-test.cc b/be/src/runtime/row-batch-serialize-test.cc index cb49bda..1f28440 100644 --- a/be/src/runtime/row-batch-serialize-test.cc +++ b/be/src/runtime/row-batch-serialize-test.cc @@ -15,6 +15,8 @@ #include "testutil/gtest-util.h" #include "runtime/collection-value.h" #include "runtime/collection-value-builder.h" +#include "runtime/mem-tracker.h" +#include "runtime/raw-value.h" #include "runtime/raw-value.inline.h" #include "runtime/row-batch.h" #include "runtime/tuple-row.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/row-batch.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index 3f2ba29..4197977 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -27,7 +27,6 @@ #include "runtime/descriptors.h" #include "runtime/disk-io-mgr.h" #include "runtime/mem-pool.h" -#include "runtime/mem-tracker.h" namespace impala { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter-bank.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc new file mode 100644 index 0000000..91f12c9 --- /dev/null +++ b/be/src/runtime/runtime-filter-bank.cc @@ -0,0 +1,222 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "runtime/runtime-filter-bank.h" + +#include "common/names.h" +#include "gen-cpp/ImpalaInternalService_types.h" +#include "gutil/bits.h" +#include "gutil/strings/substitute.h" +#include "runtime/client-cache.h" +#include "runtime/exec-env.h" +#include "runtime/backend-client.h" +#include "runtime/mem-tracker.h" +#include "runtime/runtime-filter.inline.h" +#include "service/impala-server.h" +#include "util/bloom-filter.h" + +using namespace impala; +using namespace boost; +using namespace strings; + +DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability of false " + "positives in a runtime filter before it is disabled."); + +const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE; +const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE; + +RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state) + : state_(state), closed_(false) { + memory_allocated_ = + state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES); + + // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE + max_filter_size_ = query_ctx.request.query_options.runtime_filter_max_size; + max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE); + max_filter_size_ = + BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE)); + + min_filter_size_ = query_ctx.request.query_options.runtime_filter_min_size; + min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE); + min_filter_size_ = + BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE)); + + // Make sure that min <= max + min_filter_size_ = min<int64_t>(min_filter_size_, max_filter_size_); + + DCHECK_GT(min_filter_size_, 0); + DCHECK_GT(max_filter_size_, 0); + + default_filter_size_ = query_ctx.request.query_options.runtime_bloom_filter_size; + default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_); + default_filter_size_ = + BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_)); +} + +RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc, + bool is_producer) { + RuntimeFilter* ret = obj_pool_.Add( + new RuntimeFilter(filter_desc, GetFilterSizeForNdv(filter_desc.ndv_estimate))); + lock_guard<mutex> l(runtime_filter_lock_); + if (is_producer) { + DCHECK(produced_filters_.find(filter_desc.filter_id) == produced_filters_.end()); + produced_filters_[filter_desc.filter_id] = ret; + } else { + if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) { + consumed_filters_[filter_desc.filter_id] = ret; + } else { + // The filter has already been registered in this filter bank by another + // target node. + DCHECK_GT(filter_desc.targets.size(), 1); + ret = consumed_filters_[filter_desc.filter_id]; + } + } + return ret; +} + +namespace { + +/// Sends a filter to the coordinator. Executed asynchronously in the context of +/// ExecEnv::rpc_pool(). +void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params, + ImpalaBackendClientCache* client_cache) { + Status status; + ImpalaBackendConnection coord(client_cache, address, &status); + if (!status.ok()) { + // Failing to send a filter is not a query-wide error - the remote fragment will + // continue regardless. + // TODO: Retry. + LOG(INFO) << "Couldn't send filter to coordinator: " << status.msg().msg(); + return; + } + TUpdateFilterResult res; + status = coord.DoRpc(&ImpalaBackendClient::UpdateFilter, params, &res); +} + +} + +void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id, + BloomFilter* bloom_filter) { + DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF) + << "Should not be calling UpdateFilterFromLocal() if filtering is disabled"; + TUpdateFilterParams params; + // A runtime filter may have both local and remote targets. + bool has_local_target = false; + bool has_remote_target = false; + { + lock_guard<mutex> l(runtime_filter_lock_); + RuntimeFilterMap::iterator it = produced_filters_.find(filter_id); + DCHECK(it != produced_filters_.end()) << "Tried to update unregistered filter: " + << filter_id; + it->second->SetBloomFilter(bloom_filter); + has_local_target = it->second->filter_desc().has_local_targets; + has_remote_target = it->second->filter_desc().has_remote_targets; + } + + if (has_local_target) { + // Do a short circuit publication by pushing the same BloomFilter to the consumer + // side. + RuntimeFilter* filter; + { + lock_guard<mutex> l(runtime_filter_lock_); + RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id); + if (it == consumed_filters_.end()) return; + filter = it->second; + } + filter->SetBloomFilter(bloom_filter); + state_->runtime_profile()->AddInfoString( + Substitute("Filter $0 arrival", filter_id), + PrettyPrinter::Print(filter->arrival_delay(), TUnit::TIME_MS)); + } + + if (has_remote_target + && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) { + BloomFilter::ToThrift(bloom_filter, ¶ms.bloom_filter); + params.filter_id = filter_id; + params.query_id = state_->query_id(); + + ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>( + SendFilterToCoordinator, state_->query_ctx().coord_address, params, + ExecEnv::GetInstance()->impalad_client_cache())); + } +} + +void RuntimeFilterBank::PublishGlobalFilter(int32_t filter_id, + const TBloomFilter& thrift_filter) { + lock_guard<mutex> l(runtime_filter_lock_); + if (closed_) return; + RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id); + DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: " + << filter_id; + if (thrift_filter.always_true) { + it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER); + } else { + int64_t required_space = + BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space); + // Silently fail to publish the filter (replacing it with a 0-byte complete one) if + // there's not enough memory for it. + if (!state_->query_mem_tracker()->TryConsume(required_space)) { + VLOG_QUERY << "No memory for global filter: " << filter_id + << " (fragment instance: " << state_->fragment_instance_id() << ")"; + it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER); + } else { + BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(thrift_filter)); + DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed()); + memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed()); + it->second->SetBloomFilter(bloom_filter); + } + } + state_->runtime_profile()->AddInfoString(Substitute("Filter $0 arrival", filter_id), + PrettyPrinter::Print(it->second->arrival_delay(), TUnit::TIME_MS)); +} + +BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) { + lock_guard<mutex> l(runtime_filter_lock_); + if (closed_) return NULL; + + RuntimeFilterMap::iterator it = produced_filters_.find(filter_id); + DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered"; + + // Track required space + int64_t log_filter_size = Bits::Log2Ceiling64(it->second->filter_size()); + int64_t required_space = BloomFilter::GetExpectedHeapSpaceUsed(log_filter_size); + if (!state_->query_mem_tracker()->TryConsume(required_space)) return NULL; + BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size)); + DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed()); + memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed()); + return bloom_filter; +} + +int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) { + if (ndv == -1) return default_filter_size_; + int64_t required_space = + 1LL << BloomFilter::MinLogSpace(ndv, FLAGS_max_filter_error_rate); + required_space = max<int64_t>(required_space, min_filter_size_); + required_space = min<int64_t>(required_space, max_filter_size_); + return required_space; +} + +bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv) { + double fpp = + BloomFilter::FalsePositiveProb(observed_ndv, Bits::Log2Ceiling64(filter_size)); + return fpp > FLAGS_max_filter_error_rate; +} + +void RuntimeFilterBank::Close() { + lock_guard<mutex> l(runtime_filter_lock_); + closed_ = true; + obj_pool_.Clear(); + state_->query_mem_tracker()->Release(memory_allocated_->value()); +} + http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter-bank.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h new file mode 100644 index 0000000..4703a0f --- /dev/null +++ b/be/src/runtime/runtime-filter-bank.h @@ -0,0 +1,149 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IMPALA_RUNTIME_RUNTIME_FILTER_BANK_H +#define IMPALA_RUNTIME_RUNTIME_FILTER_BANK_H + +#include "common/object-pool.h" +#include "runtime/types.h" +#include "util/runtime-profile.h" + +#include <boost/thread/lock_guard.hpp> +#include <boost/unordered_map.hpp> + +namespace impala { + +class BloomFilter; +class RuntimeFilter; +class RuntimeState; +class TBloomFilter; +class TRuntimeFilterDesc; +class TQueryCtx; + +/// RuntimeFilters are produced and consumed by plan nodes at run time to propagate +/// predicates across the plan tree dynamically. Each fragment instance manages its +/// filters with a RuntimeFilterBank which provides low-synchronization access to filter +/// objects and data structures. +/// +/// A RuntimeFilterBank manages both production and consumption of filters. In the case +/// where a given filter is both consumed and produced by the same fragment, the +/// RuntimeFilterBank treats each filter independently. +/// +/// All filters must be registered with the filter bank via RegisterFilter(). Local plan +/// fragments update the bloom filters by calling UpdateFilterFromLocal() +/// (UpdateFilterFromLocal() may only be called once per filter ID per filter bank). The +/// bloom_filter that is passed into UpdateFilterFromLocal() must have been allocated by +/// AllocateScratchBloomFilter(); this allows RuntimeFilterBank to manage all memory +/// associated with filters. +/// +/// Filters are aggregated at the coordinator, and then made available to consumers after +/// PublishGlobalFilter() has been called. +/// +/// After PublishGlobalFilter() has been called (and again, it may only be called once per +/// filter_id), the RuntimeFilter object associated with filter_id will have a valid +/// bloom_filter, and may be used for filter evaluation. This operation occurs without +/// synchronisation, and neither the thread that calls PublishGlobalFilter() nor the +/// thread that may call RuntimeFilter::Eval() need to coordinate in any way. +class RuntimeFilterBank { + public: + RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state); + + /// Registers a filter that will either be produced (is_producer == false) or consumed + /// (is_producer == true) by fragments that share this RuntimeState. The filter + /// bloom_filter itself is unallocated until the first call to PublishGlobalFilter(). + RuntimeFilter* RegisterFilter(const TRuntimeFilterDesc& filter_desc, bool is_producer); + + /// Updates a filter's bloom_filter with 'bloom_filter' which has been produced by some + /// operator in the local fragment instance. 'bloom_filter' may be NULL, representing a + /// full filter that contains all elements. + void UpdateFilterFromLocal(int32_t filter_id, BloomFilter* bloom_filter); + + /// Makes a bloom_filter (aggregated globally from all producer fragments) available for + /// consumption by operators that wish to use it for filtering. + void PublishGlobalFilter(int32_t filter_id, const TBloomFilter& thrift_filter); + + /// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size + /// 'filter_size' would have an expected false-positive rate which would exceed + /// FLAGS_max_filter_error_rate. + bool FpRateTooHigh(int64_t filter_size, int64_t observed_ndv); + + /// Returns a RuntimeFilter with the given filter id. This is safe to call after all + /// calls to RegisterFilter() have finished, and not before. Filters may be cached by + /// clients and subsequently accessed without synchronization. Concurrent calls to + /// PublishGlobalFilter() will update a filter's bloom filter atomically, without the + /// need for client synchronization. + inline const RuntimeFilter* GetRuntimeFilter(int32_t filter_id); + + /// Returns a bloom_filter that can be used by an operator to produce a local filter, + /// which may then be used in UpdateFilterFromLocal(). The memory returned is owned by + /// the RuntimeFilterBank (which may transfer it to a RuntimeFilter subsequently), and + /// should not be deleted by the caller. The filter identified by 'filter_id' must have + /// been previously registered as a 'producer' by RegisterFilter(). + /// + /// If there is not enough memory, or if Close() has been called first, returns NULL. + BloomFilter* AllocateScratchBloomFilter(int32_t filter_id); + + /// Default hash seed to use when computing hashed values to insert into filters. + static const int32_t DefaultHashSeed() { return 1234; } + + /// Releases all memory allocated for BloomFilters. + void Close(); + + static const int64_t MIN_BLOOM_FILTER_SIZE = 4 * 1024; // 4KB + static const int64_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; // 16MB + + private: + /// Returns the the space (in bytes) required for a filter to achieve the configured + /// maximum false-positive rate based on the expected NDV. If 'ndv' is -1 (i.e. no + /// estimate is known), the default filter size is returned. + int64_t GetFilterSizeForNdv(int64_t ndv); + + /// Lock protecting produced_filters_ and consumed_filters_. + boost::mutex runtime_filter_lock_; + + /// Map from filter id to a RuntimeFilter. + typedef boost::unordered_map<int32_t, RuntimeFilter*> RuntimeFilterMap; + + /// All filters expected to be produced by the local plan fragment instance. + RuntimeFilterMap produced_filters_; + + /// All filters expected to be consumed by the local plan fragment instance. + RuntimeFilterMap consumed_filters_; + + /// Fragment instance's runtime state. + RuntimeState* state_; + + /// Object pool to track allocated Bloom filters. + ObjectPool obj_pool_; + + /// True iff Close() has been called. Used to prevent races between + /// AllocateScratchBloomFilter() and Close(). + bool closed_; + + /// Total amount of memory allocated to Bloom Filters + RuntimeProfile::Counter* memory_allocated_; + + /// Precomputed default BloomFilter size. + int64_t default_filter_size_; + + /// Maximum filter size, in bytes, rounded up to a power of two. + int64_t max_filter_size_; + + /// Minimum filter size, in bytes, rounded up to a power of two. + int64_t min_filter_size_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc index b0126ec..9ae6cbb 100644 --- a/be/src/runtime/runtime-filter.cc +++ b/be/src/runtime/runtime-filter.cc @@ -14,211 +14,14 @@ #include "runtime/runtime-filter.inline.h" +#include "util/time.h" + #include "common/names.h" -#include "gutil/bits.h" -#include "gutil/strings/substitute.h" -#include "runtime/client-cache.h" -#include "runtime/exec-env.h" -#include "runtime/backend-client.h" -#include "service/impala-server.h" -#include "util/bloom-filter.h" using namespace impala; -using namespace boost; -using namespace strings; - -DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability of false " - "positives in a runtime filter before it is disabled."); const int RuntimeFilter::SLEEP_PERIOD_MS = 20; -const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE; -const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE; - -RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state) - : query_ctx_(query_ctx), state_(state), closed_(false) { - memory_allocated_ = - state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES); - - // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE - max_filter_size_ = query_ctx_.request.query_options.runtime_filter_max_size; - max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE); - max_filter_size_ = - BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE)); - - min_filter_size_ = query_ctx_.request.query_options.runtime_filter_min_size; - min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE); - min_filter_size_ = - BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE)); - - // Make sure that min <= max - min_filter_size_ = min<int64_t>(min_filter_size_, max_filter_size_); - - DCHECK_GT(min_filter_size_, 0); - DCHECK_GT(max_filter_size_, 0); - - default_filter_size_ = query_ctx_.request.query_options.runtime_bloom_filter_size; - default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_); - default_filter_size_ = - BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_)); -} - -RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc, - bool is_producer) { - RuntimeFilter* ret = obj_pool_.Add( - new RuntimeFilter(filter_desc, GetFilterSizeForNdv(filter_desc.ndv_estimate))); - lock_guard<mutex> l(runtime_filter_lock_); - if (is_producer) { - DCHECK(produced_filters_.find(filter_desc.filter_id) == produced_filters_.end()); - produced_filters_[filter_desc.filter_id] = ret; - } else { - if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) { - consumed_filters_[filter_desc.filter_id] = ret; - } else { - // The filter has already been registered in this filter bank by another - // target node. - DCHECK_GT(filter_desc.targets.size(), 1); - ret = consumed_filters_[filter_desc.filter_id]; - } - } - return ret; -} - -namespace { - -/// Sends a filter to the coordinator. Executed asynchronously in the context of -/// ExecEnv::rpc_pool(). -void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params, - ImpalaBackendClientCache* client_cache) { - Status status; - ImpalaBackendConnection coord(client_cache, address, &status); - if (!status.ok()) { - // Failing to send a filter is not a query-wide error - the remote fragment will - // continue regardless. - // TODO: Retry. - LOG(INFO) << "Couldn't send filter to coordinator: " << status.msg().msg(); - return; - } - TUpdateFilterResult res; - status = coord.DoRpc(&ImpalaBackendClient::UpdateFilter, params, &res); -} - -} - -void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id, - BloomFilter* bloom_filter) { - DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF) - << "Should not be calling UpdateFilterFromLocal() if filtering is disabled"; - TUpdateFilterParams params; - // A runtime filter may have both local and remote targets. - bool has_local_target = false; - bool has_remote_target = false; - { - lock_guard<mutex> l(runtime_filter_lock_); - RuntimeFilterMap::iterator it = produced_filters_.find(filter_id); - DCHECK(it != produced_filters_.end()) << "Tried to update unregistered filter: " - << filter_id; - it->second->SetBloomFilter(bloom_filter); - has_local_target = it->second->filter_desc().has_local_targets; - has_remote_target = it->second->filter_desc().has_remote_targets; - } - - if (has_local_target) { - // Do a short circuit publication by pushing the same BloomFilter to the consumer - // side. - RuntimeFilter* filter; - { - lock_guard<mutex> l(runtime_filter_lock_); - RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id); - if (it == consumed_filters_.end()) return; - filter = it->second; - } - filter->SetBloomFilter(bloom_filter); - state_->runtime_profile()->AddInfoString( - Substitute("Filter $0 arrival", filter_id), - PrettyPrinter::Print(filter->arrival_delay(), TUnit::TIME_MS)); - } - - if (has_remote_target - && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) { - BloomFilter::ToThrift(bloom_filter, ¶ms.bloom_filter); - params.filter_id = filter_id; - params.query_id = query_ctx_.query_id; - - ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>( - SendFilterToCoordinator, query_ctx_.coord_address, params, - ExecEnv::GetInstance()->impalad_client_cache())); - } -} - -void RuntimeFilterBank::PublishGlobalFilter(int32_t filter_id, - const TBloomFilter& thrift_filter) { - lock_guard<mutex> l(runtime_filter_lock_); - if (closed_) return; - RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id); - DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: " - << filter_id; - if (thrift_filter.always_true) { - it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER); - } else { - int64_t required_space = - BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space); - // Silently fail to publish the filter (replacing it with a 0-byte complete one) if - // there's not enough memory for it. - if (!state_->query_mem_tracker()->TryConsume(required_space)) { - VLOG_QUERY << "No memory for global filter: " << filter_id - << " (fragment instance: " << state_->fragment_instance_id() << ")"; - it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER); - } else { - BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(thrift_filter)); - DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed()); - memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed()); - it->second->SetBloomFilter(bloom_filter); - } - } - state_->runtime_profile()->AddInfoString(Substitute("Filter $0 arrival", filter_id), - PrettyPrinter::Print(it->second->arrival_delay(), TUnit::TIME_MS)); -} - -BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) { - lock_guard<mutex> l(runtime_filter_lock_); - if (closed_) return NULL; - - RuntimeFilterMap::iterator it = produced_filters_.find(filter_id); - DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered"; - - // Track required space - int64_t log_filter_size = Bits::Log2Ceiling64(it->second->filter_size()); - int64_t required_space = BloomFilter::GetExpectedHeapSpaceUsed(log_filter_size); - if (!state_->query_mem_tracker()->TryConsume(required_space)) return NULL; - BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size)); - DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed()); - memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed()); - return bloom_filter; -} - -int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) { - if (ndv == -1) return default_filter_size_; - int64_t required_space = - 1LL << BloomFilter::MinLogSpace(ndv, FLAGS_max_filter_error_rate); - required_space = max<int64_t>(required_space, min_filter_size_); - required_space = min<int64_t>(required_space, max_filter_size_); - return required_space; -} - -bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv) { - double fpp = - BloomFilter::FalsePositiveProb(observed_ndv, Bits::Log2Ceiling64(filter_size)); - return fpp > FLAGS_max_filter_error_rate; -} - -void RuntimeFilterBank::Close() { - lock_guard<mutex> l(runtime_filter_lock_); - closed_ = true; - obj_pool_.Clear(); - state_->query_mem_tracker()->Release(memory_allocated_->value()); -} - bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const { do { if (HasBloomFilter()) return true; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h index 178c03f..4168704 100644 --- a/be/src/runtime/runtime-filter.h +++ b/be/src/runtime/runtime-filter.h @@ -16,135 +16,14 @@ #ifndef IMPALA_RUNTIME_RUNTIME_FILTER_H #define IMPALA_RUNTIME_RUNTIME_FILTER_H -#include <boost/unordered_map.hpp> - -#include "common/object-pool.h" -#include "gen-cpp/ImpalaInternalService_types.h" -#include "gen-cpp/PlanNodes_types.h" -#include "runtime/types.h" -#include "util/runtime-profile.h" +#include "runtime/raw-value.h" +#include "runtime/runtime-filter-bank.h" +#include "util/bloom-filter.h" #include "util/spinlock.h" namespace impala { class BloomFilter; -class RuntimeFilter; -class RuntimeState; - -/// RuntimeFilters are produced and consumed by plan nodes at run time to propagate -/// predicates across the plan tree dynamically. Each fragment instance manages its -/// filters with a RuntimeFilterBank which provides low-synchronization access to filter -/// objects and data structures. -/// -/// A RuntimeFilterBank manages both production and consumption of filters. In the case -/// where a given filter is both consumed and produced by the same fragment, the -/// RuntimeFilterBank treats each filter independently. -/// -/// All filters must be registered with the filter bank via RegisterFilter(). Local plan -/// fragments update the bloom filters by calling UpdateFilterFromLocal() -/// (UpdateFilterFromLocal() may only be called once per filter ID per filter bank). The -/// bloom_filter that is passed into UpdateFilterFromLocal() must have been allocated by -/// AllocateScratchBloomFilter() (or be NULL); this allows RuntimeFilterBank to manage all -/// memory associated with filters. -/// -/// Filters are aggregated at the coordinator, and then made available to consumers after -/// PublishGlobalFilter() has been called. -/// -/// After PublishGlobalFilter() has been called (and again, it may only be called once per -/// filter_id), the RuntimeFilter object associated with filter_id will have a valid -/// bloom_filter, and may be used for filter evaluation. This operation occurs without -/// synchronisation, and neither the thread that calls PublishGlobalFilter() nor the -/// thread that may call RuntimeFilter::Eval() need to coordinate in any way. -class RuntimeFilterBank { - public: - RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state); - - /// Registers a filter that will either be produced (is_producer == false) or consumed - /// (is_producer == true) by fragments that share this RuntimeState. The filter - /// bloom_filter itself is unallocated until the first call to PublishGlobalFilter(). - RuntimeFilter* RegisterFilter(const TRuntimeFilterDesc& filter_desc, bool is_producer); - - /// Updates a filter's bloom_filter with 'bloom_filter' which has been produced by some - /// operator in the local fragment instance. 'bloom_filter' may be NULL, representing a - /// full filter that contains all elements. - void UpdateFilterFromLocal(int32_t filter_id, BloomFilter* bloom_filter); - - /// Makes a bloom_filter (aggregated globally from all producer fragments) available for - /// consumption by operators that wish to use it for filtering. - void PublishGlobalFilter(int32_t filter_id, const TBloomFilter& thrift_filter); - - /// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size - /// 'filter_size' would have an expected false-positive rate which would exceed - /// FLAGS_max_filter_error_rate. - bool FpRateTooHigh(int64_t filter_size, int64_t observed_ndv); - - /// Returns a RuntimeFilter with the given filter id. This is safe to call after all - /// calls to RegisterFilter() have finished, and not before. Filters may be cached by - /// clients and subsequently accessed without synchronization. Concurrent calls to - /// PublishGlobalFilter() will update a filter's bloom filter atomically, without the - /// need for client synchronization. - inline const RuntimeFilter* GetRuntimeFilter(int32_t filter_id); - - /// Returns a bloom_filter that can be used by an operator to produce a local filter, - /// which may then be used in UpdateFilterFromLocal(). The memory returned is owned by - /// the RuntimeFilterBank (which may transfer it to a RuntimeFilter subsequently), and - /// should not be deleted by the caller. The filter identified by 'filter_id' must have - /// been previously registered as a 'producer' by RegisterFilter(). - /// - /// If there is not enough memory, or if Close() has been called first, returns NULL. - BloomFilter* AllocateScratchBloomFilter(int32_t filter_id); - - /// Default hash seed to use when computing hashed values to insert into filters. - static const int32_t DefaultHashSeed() { return 1234; } - - /// Releases all memory allocated for BloomFilters. - void Close(); - - static const int64_t MIN_BLOOM_FILTER_SIZE = 4 * 1024; // 4KB - static const int64_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; // 16MB - - private: - /// Returns the the space (in bytes) required for a filter to achieve the configured - /// maximum false-positive rate based on the expected NDV. If 'ndv' is -1 (i.e. no - /// estimate is known), the default filter size is returned. - int64_t GetFilterSizeForNdv(int64_t ndv); - - const TQueryCtx query_ctx_; - - /// Lock protecting produced_filters_ and consumed_filters_. - boost::mutex runtime_filter_lock_; - - /// Map from filter id to a RuntimeFilter. - typedef boost::unordered_map<int32_t, RuntimeFilter*> RuntimeFilterMap; - - /// All filters expected to be produced by the local plan fragment instance. - RuntimeFilterMap produced_filters_; - - /// All filters expected to be consumed by the local plan fragment instance. - RuntimeFilterMap consumed_filters_; - - /// Fragment instance's runtime state. - RuntimeState* state_; - - /// Object pool to track allocated Bloom filters. - ObjectPool obj_pool_; - - /// True iff Close() has been called. Used to prevent races between - /// AllocateScratchBloomFilter() and Close(). - bool closed_; - - /// Total amount of memory allocated to Bloom Filters - RuntimeProfile::Counter* memory_allocated_; - - /// Precomputed default BloomFilter size. - int64_t default_filter_size_; - - /// Maximum filter size, in bytes, rounded up to a power of two. - int64_t max_filter_size_; - - /// Minimum filter size, in bytes, rounded up to a power of two. - int64_t min_filter_size_; -}; /// RuntimeFilters represent set-membership predicates (implemented with bloom filters) /// that are computed during query execution (rather than during planning). They can then
