http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/tmp-file-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index f1e243c..24217de 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -26,7 +26,8 @@ #include <gutil/strings/join.h> #include <gutil/strings/substitute.h> -#include "runtime/disk-io-mgr-reader-context.h" +#include "runtime/io/disk-io-mgr.h" +#include "runtime/io/request-context.h" #include "runtime/runtime-state.h" #include "runtime/tmp-file-mgr-internal.h" #include "util/bit-util.h" @@ -52,6 +53,7 @@ using boost::algorithm::token_compress_on; using boost::filesystem::absolute; using boost::filesystem::path; using boost::uuids::random_generator; +using namespace impala::io; using namespace strings; namespace impala { @@ -358,7 +360,7 @@ Status TmpFileMgr::FileGroup::Write( unique_ptr<WriteHandle> tmp_handle(new WriteHandle(encryption_timer_, cb)); WriteHandle* tmp_handle_ptr = tmp_handle.get(); // Pass ptr by value into lambda. - DiskIoMgr::WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr]( + WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr]( const Status& write_status) { WriteComplete(tmp_handle_ptr, write_status); }; RETURN_IF_ERROR( tmp_handle->Write(io_mgr_, io_ctx_.get(), tmp_file, file_offset, buffer, callback)); @@ -387,11 +389,11 @@ Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) { DCHECK(handle->write_range_ != nullptr); // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state // since the write is not in flight. - handle->read_range_ = scan_range_pool_.Add(new DiskIoMgr::ScanRange); + handle->read_range_ = scan_range_pool_.Add(new ScanRange); handle->read_range_->Reset(nullptr, handle->write_range_->file(), handle->write_range_->len(), handle->write_range_->offset(), handle->write_range_->disk_id(), false, - DiskIoMgr::BufferOpts::ReadInto(buffer.data(), buffer.len())); + BufferOpts::ReadInto(buffer.data(), buffer.len())); read_counter_->Add(1); bytes_read_counter_->Add(buffer.len()); RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_.get(), handle->read_range_, true)); @@ -403,7 +405,7 @@ Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* handle, MemRange buf // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state // since the write is not in flight. SCOPED_TIMER(disk_read_timer_); - unique_ptr<DiskIoMgr::BufferDescriptor> io_mgr_buffer; + unique_ptr<BufferDescriptor> io_mgr_buffer; Status status = handle->read_range_->GetNext(&io_mgr_buffer); if (!status.ok()) goto exit; DCHECK(io_mgr_buffer != NULL); @@ -525,9 +527,9 @@ string TmpFileMgr::WriteHandle::TmpFilePath() const { return file_->path(); } -Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, +Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, RequestContext* io_ctx, File* file, int64_t offset, MemRange buffer, - DiskIoMgr::WriteRange::WriteDoneCallback callback) { + WriteRange::WriteDoneCallback callback) { DCHECK(!write_in_flight_); if (FLAGS_disk_spill_encryption) RETURN_IF_ERROR(EncryptAndHash(buffer)); @@ -536,7 +538,7 @@ Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, DiskIoRequestContext* i // WriteComplete() may be called concurrently with the remainder of this function. file_ = file; write_range_.reset( - new DiskIoMgr::WriteRange(file->path(), offset, file->AssignDiskQueue(), callback)); + new WriteRange(file->path(), offset, file->AssignDiskQueue(), callback)); write_range_->SetData(buffer.data(), buffer.len()); write_in_flight_ = true; Status status = io_mgr->AddWriteRange(io_ctx, write_range_.get()); @@ -553,7 +555,7 @@ Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, DiskIoRequestContext* i } Status TmpFileMgr::WriteHandle::RetryWrite( - DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t offset) { + DiskIoMgr* io_mgr, RequestContext* io_ctx, File* file, int64_t offset) { DCHECK(write_in_flight_); file_ = file; write_range_->SetRange(file->path(), offset, file->AssignDiskQueue());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/tmp-file-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h index f550af2..95072ae 100644 --- a/be/src/runtime/tmp-file-mgr.h +++ b/be/src/runtime/tmp-file-mgr.h @@ -28,10 +28,10 @@ #include "common/object-pool.h" #include "common/status.h" #include "gen-cpp/Types_types.h" // for TUniqueId -#include "runtime/disk-io-mgr.h" -#include "util/mem-range.h" +#include "runtime/io/request-ranges.h" #include "util/collection-metrics.h" #include "util/condition-variable.h" +#include "util/mem-range.h" #include "util/openssl-util.h" #include "util/runtime-profile.h" #include "util/spinlock.h" @@ -100,7 +100,7 @@ class TmpFileMgr { /// space used. 'unique_id' is a unique ID that is used to prefix any scratch file /// names. It is an error to create multiple FileGroups with the same 'unique_id'. /// 'bytes_limit' is the limit on the total file space to allocate. - FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, RuntimeProfile* profile, + FileGroup(TmpFileMgr* tmp_file_mgr, io::DiskIoMgr* io_mgr, RuntimeProfile* profile, const TUniqueId& unique_id, int64_t bytes_limit = -1); ~FileGroup(); @@ -198,10 +198,10 @@ class TmpFileMgr { TmpFileMgr* const tmp_file_mgr_; /// DiskIoMgr used for all I/O to temporary files. - DiskIoMgr* const io_mgr_; + io::DiskIoMgr* const io_mgr_; /// I/O context used for all reads and writes. Registered in constructor. - std::unique_ptr<DiskIoRequestContext> io_ctx_; + std::unique_ptr<io::RequestContext> io_ctx_; /// Stores scan ranges allocated in Read(). Needed because ScanRange objects may be /// touched by DiskIoMgr even after the scan is finished. @@ -303,14 +303,14 @@ class TmpFileMgr { /// Starts a write of 'buffer' to 'offset' of 'file'. 'write_in_flight_' must be false /// before calling. After returning, 'write_in_flight_' is true on success or false on /// failure and 'is_cancelled_' is set to true on failure. - Status Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, + Status Write(io::DiskIoMgr* io_mgr, io::RequestContext* io_ctx, File* file, int64_t offset, MemRange buffer, - DiskIoMgr::WriteRange::WriteDoneCallback callback) WARN_UNUSED_RESULT; + io::WriteRange::WriteDoneCallback callback) WARN_UNUSED_RESULT; /// Retry the write after the initial write failed with an error, instead writing to /// 'offset' of 'file'. 'write_in_flight_' must be true before calling. /// After returning, 'write_in_flight_' is true on success or false on failure. - Status RetryWrite(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, + Status RetryWrite(io::DiskIoMgr* io_mgr, io::RequestContext* io_ctx, File* file, int64_t offset) WARN_UNUSED_RESULT; /// Called when the write has completed successfully or not. Sets 'write_in_flight_' @@ -340,7 +340,7 @@ class TmpFileMgr { RuntimeProfile::Counter* encryption_timer_; /// The DiskIoMgr write range for this write. - boost::scoped_ptr<DiskIoMgr::WriteRange> write_range_; + boost::scoped_ptr<io::WriteRange> write_range_; /// The temporary file being written to. File* file_; @@ -355,7 +355,7 @@ class TmpFileMgr { /// The scan range for the read that is currently in flight. NULL when no read is in /// flight. - DiskIoMgr::ScanRange* read_range_; + io::ScanRange* read_range_; /// Protects all fields below while 'write_in_flight_' is true. At other times, it is /// invalid to call WriteRange/FileGroup methods concurrently from multiple threads,