http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index f1e243c..24217de 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -26,7 +26,8 @@
 #include <gutil/strings/join.h>
 #include <gutil/strings/substitute.h>
 
-#include "runtime/disk-io-mgr-reader-context.h"
+#include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/request-context.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tmp-file-mgr-internal.h"
 #include "util/bit-util.h"
@@ -52,6 +53,7 @@ using boost::algorithm::token_compress_on;
 using boost::filesystem::absolute;
 using boost::filesystem::path;
 using boost::uuids::random_generator;
+using namespace impala::io;
 using namespace strings;
 
 namespace impala {
@@ -358,7 +360,7 @@ Status TmpFileMgr::FileGroup::Write(
 
   unique_ptr<WriteHandle> tmp_handle(new WriteHandle(encryption_timer_, cb));
   WriteHandle* tmp_handle_ptr = tmp_handle.get(); // Pass ptr by value into 
lambda.
-  DiskIoMgr::WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr](
+  WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr](
       const Status& write_status) { WriteComplete(tmp_handle_ptr, 
write_status); };
   RETURN_IF_ERROR(
       tmp_handle->Write(io_mgr_, io_ctx_.get(), tmp_file, file_offset, buffer, 
callback));
@@ -387,11 +389,11 @@ Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* 
handle, MemRange buffer) {
   DCHECK(handle->write_range_ != nullptr);
   // Don't grab handle->write_state_lock_, it is safe to touch all of handle's 
state
   // since the write is not in flight.
-  handle->read_range_ = scan_range_pool_.Add(new DiskIoMgr::ScanRange);
+  handle->read_range_ = scan_range_pool_.Add(new ScanRange);
   handle->read_range_->Reset(nullptr, handle->write_range_->file(),
       handle->write_range_->len(), handle->write_range_->offset(),
       handle->write_range_->disk_id(), false,
-      DiskIoMgr::BufferOpts::ReadInto(buffer.data(), buffer.len()));
+      BufferOpts::ReadInto(buffer.data(), buffer.len()));
   read_counter_->Add(1);
   bytes_read_counter_->Add(buffer.len());
   RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_.get(), handle->read_range_, 
true));
@@ -403,7 +405,7 @@ Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* 
handle, MemRange buf
   // Don't grab handle->write_state_lock_, it is safe to touch all of handle's 
state
   // since the write is not in flight.
   SCOPED_TIMER(disk_read_timer_);
-  unique_ptr<DiskIoMgr::BufferDescriptor> io_mgr_buffer;
+  unique_ptr<BufferDescriptor> io_mgr_buffer;
   Status status = handle->read_range_->GetNext(&io_mgr_buffer);
   if (!status.ok()) goto exit;
   DCHECK(io_mgr_buffer != NULL);
@@ -525,9 +527,9 @@ string TmpFileMgr::WriteHandle::TmpFilePath() const {
   return file_->path();
 }
 
-Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, DiskIoRequestContext* 
io_ctx,
+Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, RequestContext* 
io_ctx,
     File* file, int64_t offset, MemRange buffer,
-    DiskIoMgr::WriteRange::WriteDoneCallback callback) {
+    WriteRange::WriteDoneCallback callback) {
   DCHECK(!write_in_flight_);
 
   if (FLAGS_disk_spill_encryption) RETURN_IF_ERROR(EncryptAndHash(buffer));
@@ -536,7 +538,7 @@ Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, 
DiskIoRequestContext* i
   // WriteComplete() may be called concurrently with the remainder of this 
function.
   file_ = file;
   write_range_.reset(
-      new DiskIoMgr::WriteRange(file->path(), offset, file->AssignDiskQueue(), 
callback));
+      new WriteRange(file->path(), offset, file->AssignDiskQueue(), callback));
   write_range_->SetData(buffer.data(), buffer.len());
   write_in_flight_ = true;
   Status status = io_mgr->AddWriteRange(io_ctx, write_range_.get());
@@ -553,7 +555,7 @@ Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, 
DiskIoRequestContext* i
 }
 
 Status TmpFileMgr::WriteHandle::RetryWrite(
-    DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t 
offset) {
+    DiskIoMgr* io_mgr, RequestContext* io_ctx, File* file, int64_t offset) {
   DCHECK(write_in_flight_);
   file_ = file;
   write_range_->SetRange(file->path(), offset, file->AssignDiskQueue());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index f550af2..95072ae 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -28,10 +28,10 @@
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "gen-cpp/Types_types.h" // for TUniqueId
-#include "runtime/disk-io-mgr.h"
-#include "util/mem-range.h"
+#include "runtime/io/request-ranges.h"
 #include "util/collection-metrics.h"
 #include "util/condition-variable.h"
+#include "util/mem-range.h"
 #include "util/openssl-util.h"
 #include "util/runtime-profile.h"
 #include "util/spinlock.h"
@@ -100,7 +100,7 @@ class TmpFileMgr {
     /// space used. 'unique_id' is a unique ID that is used to prefix any 
scratch file
     /// names. It is an error to create multiple FileGroups with the same 
'unique_id'.
     /// 'bytes_limit' is the limit on the total file space to allocate.
-    FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, RuntimeProfile* 
profile,
+    FileGroup(TmpFileMgr* tmp_file_mgr, io::DiskIoMgr* io_mgr, RuntimeProfile* 
profile,
         const TUniqueId& unique_id, int64_t bytes_limit = -1);
 
     ~FileGroup();
@@ -198,10 +198,10 @@ class TmpFileMgr {
     TmpFileMgr* const tmp_file_mgr_;
 
     /// DiskIoMgr used for all I/O to temporary files.
-    DiskIoMgr* const io_mgr_;
+    io::DiskIoMgr* const io_mgr_;
 
     /// I/O context used for all reads and writes. Registered in constructor.
-    std::unique_ptr<DiskIoRequestContext> io_ctx_;
+    std::unique_ptr<io::RequestContext> io_ctx_;
 
     /// Stores scan ranges allocated in Read(). Needed because ScanRange 
objects may be
     /// touched by DiskIoMgr even after the scan is finished.
@@ -303,14 +303,14 @@ class TmpFileMgr {
     /// Starts a write of 'buffer' to 'offset' of 'file'. 'write_in_flight_' 
must be false
     /// before calling. After returning, 'write_in_flight_' is true on success 
or false on
     /// failure and 'is_cancelled_' is set to true on failure.
-    Status Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file,
+    Status Write(io::DiskIoMgr* io_mgr, io::RequestContext* io_ctx, File* file,
         int64_t offset, MemRange buffer,
-        DiskIoMgr::WriteRange::WriteDoneCallback callback) WARN_UNUSED_RESULT;
+        io::WriteRange::WriteDoneCallback callback) WARN_UNUSED_RESULT;
 
     /// Retry the write after the initial write failed with an error, instead 
writing to
     /// 'offset' of 'file'. 'write_in_flight_' must be true before calling.
     /// After returning, 'write_in_flight_' is true on success or false on 
failure.
-    Status RetryWrite(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* 
file,
+    Status RetryWrite(io::DiskIoMgr* io_mgr, io::RequestContext* io_ctx, File* 
file,
         int64_t offset) WARN_UNUSED_RESULT;
 
     /// Called when the write has completed successfully or not. Sets 
'write_in_flight_'
@@ -340,7 +340,7 @@ class TmpFileMgr {
     RuntimeProfile::Counter* encryption_timer_;
 
     /// The DiskIoMgr write range for this write.
-    boost::scoped_ptr<DiskIoMgr::WriteRange> write_range_;
+    boost::scoped_ptr<io::WriteRange> write_range_;
 
     /// The temporary file being written to.
     File* file_;
@@ -355,7 +355,7 @@ class TmpFileMgr {
 
     /// The scan range for the read that is currently in flight. NULL when no 
read is in
     /// flight.
-    DiskIoMgr::ScanRange* read_range_;
+    io::ScanRange* read_range_;
 
     /// Protects all fields below while 'write_in_flight_' is true. At other 
times, it is
     /// invalid to call WriteRange/FileGroup methods concurrently from 
multiple threads,

Reply via email to