IMPALA-3202,IMPALA-2079: rework scratch file I/O

Refactor BufferedBlockMgr/TmpFileMgr to push more I/O logic into
TmpFileMgr, in anticipation of it being shared with BufferPool.
TmpFileMgr now handles:
* Scratch space allocation and recycling
* Read and write I/O

The interface is also greatly changed so that it is built around Write()
and Read() calls, abstracting away the details of temporary file
allocation from clients. This means the TmpFileMgr::File class can
be hidden from clients.

Write error recovery:
Also implement write error recovery in TmpFileMgr.

If an error occurs while writing to scratch and we have multiple
scratch directories, we will try one of the other directories
before cancelling the query. File-level blacklisting is used to
prevent excessive repeated attempts to resize a scratch file during
a single query. Device-level blacklisting is not implemented because
it is problematic to permanently take a scratch directory out of use.

To reduce the number of error paths, all I/O errors are now handled
asynchronously. Previously errors creating or extending the file were
returned synchronously from WriteUnpinnedBlock(). This required
modifying DiskIoMgr to create the file if not present when opened.

Also set the default max_errors value in the thrift definition file,
so that it is in effect for backend tests.

Future Work:
* Support for recycling variable-length scratch file ranges. I omitted
  this to avoid making the patch even large.

Testing:
Updated BufferedBlockMgr unit test to reflect changes in behaviour:
* Scratch space is no longer permanently associated with a block, and
  is remapped every time a new block is written to disk .
* Files are now blacklisted - updated existing tests and enable the
  disable blacklisting test.

Added some basic testing of recycling of scratch file ranges in
the TmpFileMgr unit test.

I also manually tested the code in two ways. First by removing permissions
for /tmp/impala-scratch and ensuring that a spilling query fails cleanly.
Second, by creating a tiny ramdisk (16M) and running with two scratch
directories: one on /tmp and one on the tiny ramdisk. When spilling, an
out of space error is encountered for the tiny ramdisk and impala spills
the remaining data (72M) to /tmp.

Change-Id: I8c9c587df006d2f09d72dd636adafbd295fcdc17
Reviewed-on: http://gerrit.cloudera.org:8080/5141
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/95ed4434
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/95ed4434
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/95ed4434

Branch: refs/heads/master
Commit: 95ed4434f2f446e214934f7dc251b843c1d6b0a6
Parents: 6b90aa3
Author: Tim Armstrong <[email protected]>
Authored: Fri Sep 4 10:54:11 2015 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Jan 5 02:26:24 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/buffered-block-mgr-test.cc       | 159 +++---
 be/src/runtime/buffered-block-mgr.cc            | 225 +++------
 be/src/runtime/buffered-block-mgr.h             |  90 ++--
 be/src/runtime/disk-io-mgr-test.cc              |  14 +-
 be/src/runtime/disk-io-mgr.cc                   |  42 +-
 be/src/runtime/disk-io-mgr.h                    |  19 +-
 be/src/runtime/exec-env.cc                      |   8 +-
 be/src/runtime/exec-env.h                       |   2 +-
 be/src/runtime/query-state.cc                   |   2 -
 be/src/runtime/tmp-file-mgr-internal.h          |  93 ++++
 be/src/runtime/tmp-file-mgr-test.cc             | 322 ++++++++----
 be/src/runtime/tmp-file-mgr.cc                  | 504 ++++++++++++++-----
 be/src/runtime/tmp-file-mgr.h                   | 430 ++++++++++------
 be/src/util/disk-info.cc                        |   1 -
 be/src/util/disk-info.h                         |  24 +-
 be/src/util/filesystem-util.cc                  |  11 -
 be/src/util/filesystem-util.h                   |   3 -
 be/src/util/mem-range.h                         |  47 ++
 common/thrift/ImpalaInternalService.thrift      |   2 +-
 .../functional-query/queries/QueryTest/set.test |   8 +-
 tests/custom_cluster/test_scratch_disk.py       |  72 +--
 21 files changed, 1331 insertions(+), 747 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc 
b/be/src/runtime/buffered-block-mgr-test.cc
index 1828ff8..9b616f5 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -15,13 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <gutil/strings/substitute.h>
-#include <sys/stat.h>
 #include <boost/bind.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/filesystem.hpp>
+#include <boost/regex.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/thread.hpp>
+#include <gutil/strings/substitute.h>
+#include <sys/stat.h>
 
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
@@ -37,6 +38,7 @@
 #include "testutil/gtest-util.h"
 #include "util/cpu-info.h"
 #include "util/disk-info.h"
+#include "util/error-util.h"
 #include "util/filesystem-util.h"
 #include "util/promise.h"
 #include "util/test-info.h"
@@ -49,6 +51,7 @@
 
 using boost::filesystem::directory_iterator;
 using boost::filesystem::remove;
+using boost::regex;
 
 // Note: This is the default scratch dir created by impala.
 // FLAGS_scratch_dirs + TmpFileMgr::TMP_SUB_DIR_NAME.
@@ -100,7 +103,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
       created_tmp_dirs_.push_back(dir);
     }
     test_env_->InitTmpFileMgr(tmp_dirs, false);
-    EXPECT_EQ(num_dirs, test_env_->tmp_file_mgr()->num_active_tmp_devices());
+    EXPECT_EQ(num_dirs, test_env_->tmp_file_mgr()->NumActiveTmpDevices());
     return tmp_dirs;
   }
 
@@ -254,9 +257,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
   }
 
   static bool AllWritesComplete(BufferedBlockMgr* block_mgr) {
-    RuntimeProfile::Counter* writes_outstanding =
-        block_mgr->profile()->GetCounter("BlockWritesOutstanding");
-    return writes_outstanding->value() == 0;
+    return block_mgr->GetNumWritesOutstanding() == 0;
   }
 
   static bool AllWritesComplete(const vector<BufferedBlockMgr*>& block_mgrs) {
@@ -266,13 +267,12 @@ class BufferedBlockMgrTest : public ::testing::Test {
     return true;
   }
 
-  // Delete the temporary file backing a block - all subsequent writes to the 
file
-  // should fail. Expects backing file has already been allocated.
-  static void DeleteBackingFile(BufferedBlockMgr::Block* block) {
-    const string& path = block->TmpFilePath();
-    ASSERT_GT(path.size(), 0);
-    ASSERT_TRUE(remove(path));
-    LOG(INFO) << "Injected fault by deleting file " << path;
+  // Remove permissions for the temporary file at 'path' - all subsequent 
writes
+  // to the file should fail. Expects backing file has already been allocated.
+  static void DisableBackingFile(const string& path) {
+    EXPECT_GT(path.size(), 0);
+    EXPECT_EQ(0, chmod(path.c_str(), 0));
+    LOG(INFO) << "Injected fault by removing file permissions " << path;
   }
 
   // Check that the file backing the block has dir as a prefix of its path.
@@ -910,9 +910,11 @@ void BufferedBlockMgrTest::TestRuntimeStateTeardown(
     UnpinBlocks(blocks);
     vector<BufferedBlockMgr::Block*> more_blocks;
     AllocateBlocks(block_mgr.get(), client, max_num_buffers, &more_blocks);
+
+    const string& tmp_file_path = blocks[0]->TmpFilePath();
     DeleteBlocks(more_blocks);
     PinBlocks(blocks);
-    DeleteBackingFile(blocks[0]);
+    DisableBackingFile(tmp_file_path);
   }
 
   // Unpin will initiate writes. If the write error propagates fast enough, 
some Unpin()
@@ -968,14 +970,15 @@ TEST_F(BufferedBlockMgrTest, 
WriteCompleteWithCancelledRuntimeState) {
   DeleteBlocks(blocks);
 }
 
-// Clear scratch directory. Return # of files deleted.
-static int clear_scratch_dir() {
+// Remove write permissions on scratch files. Return # of scratch files.
+static int remove_scratch_perms() {
   int num_files = 0;
   directory_iterator dir_it(SCRATCH_DIR);
   for (; dir_it != directory_iterator(); ++dir_it) {
     ++num_files;
-    remove_all(dir_it->path());
+    chmod(dir_it->path().c_str(), 0);
   }
+
   return num_files;
 }
 
@@ -997,7 +1000,7 @@ TEST_F(BufferedBlockMgrTest, WriteError) {
   // Repin the blocks
   PinBlocks(blocks);
   // Remove the backing storage so that future writes will fail
-  int num_files = clear_scratch_dir();
+  int num_files = remove_scratch_perms();
   ASSERT_GT(num_files, 0);
   UnpinBlocks(blocks, true);
   WaitForWrites(block_mgr);
@@ -1024,23 +1027,25 @@ TEST_F(BufferedBlockMgrTest, TmpFileAllocateError) {
   ASSERT_OK(blocks[0]->Unpin());
   WaitForWrites(block_mgr);
   // Remove temporary files - subsequent operations will fail.
-  int num_files = clear_scratch_dir();
-  ASSERT_GT(num_files, 0);
-  // Current implementation will fail here because it tries to expand the tmp 
file
-  // immediately. This behavior is not contractual but we want to know if it 
changes
-  // accidentally.
-  Status status = blocks[1]->Unpin();
-  ASSERT_FALSE(status.ok());
+  int num_files = remove_scratch_perms();
+  ASSERT_TRUE(num_files > 0);
+  // Current implementation will not fail here until it attempts to write the 
file.
+  // This behavior is not contractual but we want to know if it changes 
accidentally.
+  ASSERT_OK(blocks[1]->Unpin());
+
+  // Write failure should cancel query
+  WaitForWrites(block_mgr);
+  ASSERT_TRUE(block_mgr->IsCancelled());
 
   DeleteBlocks(blocks);
   TearDownMgrs();
 }
 
 // Test that the block manager is able to blacklist a temporary device 
correctly after a
-// write error. We should not allocate more blocks on that device, but 
existing blocks
-// on the device will remain in use.
-/// Disabled because blacklisting was disabled as workaround for IMPALA-2305.
-TEST_F(BufferedBlockMgrTest, DISABLED_WriteErrorBlacklist) {
+// write error. The query that encountered the write error should not allocate 
more
+// blocks on that device, but existing blocks on the device will remain in use 
and future
+// queries will use the device.
+TEST_F(BufferedBlockMgrTest, WriteErrorBlacklist) {
   // Set up two buffered block managers with two temporary dirs.
   vector<string> tmp_dirs = InitMultipleTmpDirs(2);
   // Simulate two concurrent queries.
@@ -1074,50 +1079,71 @@ TEST_F(BufferedBlockMgrTest, 
DISABLED_WriteErrorBlacklist) {
   // Delete one file from first scratch dir for first block manager.
   BufferedBlockMgr::Block* error_block = FindBlockForDir(blocks[error_mgr], 
error_dir);
   ASSERT_TRUE(error_block != NULL) << "Expected a tmp file in dir " << 
error_dir;
+  const string& error_file_path = error_block->TmpFilePath();
   PinBlocks(all_blocks);
-  DeleteBackingFile(error_block);
-  UnpinBlocks(all_blocks); // Should succeed since tmp file space was already 
allocated.
+  DisableBackingFile(error_file_path);
+  UnpinBlocks(all_blocks); // Should succeed since writes occur asynchronously
   WaitForWrites(block_mgrs);
-  ASSERT_TRUE(block_mgrs[error_mgr]->IsCancelled());
+  // Both block managers have a usable tmp directory so should still be usable.
+  ASSERT_FALSE(block_mgrs[error_mgr]->IsCancelled());
   ASSERT_FALSE(block_mgrs[no_error_mgr]->IsCancelled());
-  // Temporary device with error should no longer be active.
+  // Temporary device with error should still be active.
   vector<TmpFileMgr::DeviceId> active_tmp_devices =
-      test_env_->tmp_file_mgr()->active_tmp_devices();
-  ASSERT_EQ(tmp_dirs.size() - 1, active_tmp_devices.size());
+      test_env_->tmp_file_mgr()->ActiveTmpDevices();
+  ASSERT_EQ(tmp_dirs.size(), active_tmp_devices.size());
   for (int i = 0; i < active_tmp_devices.size(); ++i) {
     const string& device_path =
         test_env_->tmp_file_mgr()->GetTmpDirPath(active_tmp_devices[i]);
     ASSERT_EQ(string::npos, error_dir.find(device_path));
   }
-  // The second block manager should continue using allocated scratch space, 
since it
-  // didn't encounter a write error itself. In future this could change but 
for now it is
-  // the intended behaviour.
+
+  // The error block manager should only allocate from the device that had no 
error.
+  // The non-error block manager should continue using both devices, since it 
didn't
+  // encounter a write error itself.
+  vector<BufferedBlockMgr::Block*> error_new_blocks;
+  AllocateBlocks(
+      block_mgrs[error_mgr], clients[error_mgr], blocks_per_mgr, 
&error_new_blocks);
+  UnpinBlocks(error_new_blocks);
+  WaitForWrites(block_mgrs);
+  EXPECT_TRUE(FindBlockForDir(error_new_blocks, good_dir) != NULL);
+  EXPECT_TRUE(FindBlockForDir(error_new_blocks, error_dir) == NULL);
+  for (int i = 0; i < error_new_blocks.size(); ++i) {
+    LOG(INFO) << "Newly created block backed by file "
+              << error_new_blocks[i]->TmpFilePath();
+    EXPECT_TRUE(BlockInDir(error_new_blocks[i], good_dir));
+  }
+  DeleteBlocks(error_new_blocks);
+
   PinBlocks(blocks[no_error_mgr]);
   UnpinBlocks(blocks[no_error_mgr]);
-  ASSERT_TRUE(FindBlockForDir(blocks[no_error_mgr], good_dir) != NULL);
-  ASSERT_TRUE(FindBlockForDir(blocks[no_error_mgr], error_dir) != NULL);
-  // The second block manager should avoid using bad directory for new blocks.
+  WaitForWrites(block_mgrs);
+  EXPECT_TRUE(FindBlockForDir(blocks[no_error_mgr], good_dir) != NULL);
+  EXPECT_TRUE(FindBlockForDir(blocks[no_error_mgr], error_dir) != NULL);
+
+  // The second block manager should use the bad directory for new blocks since
+  // blacklisting is per-manager, not global.
   vector<BufferedBlockMgr::Block*> no_error_new_blocks;
   AllocateBlocks(block_mgrs[no_error_mgr], clients[no_error_mgr], 
blocks_per_mgr,
       &no_error_new_blocks);
   UnpinBlocks(no_error_new_blocks);
-  for (int i = 0; i < no_error_new_blocks.size(); ++i) {
-    LOG(INFO) << "Newly created block backed by file "
-              << no_error_new_blocks[i]->TmpFilePath();
-    ASSERT_TRUE(BlockInDir(no_error_new_blocks[i], good_dir));
-  }
-  // A new block manager should only use the good dir for backing storage.
+  WaitForWrites(block_mgrs);
+  EXPECT_TRUE(FindBlockForDir(no_error_new_blocks, good_dir) != NULL);
+  EXPECT_TRUE(FindBlockForDir(no_error_new_blocks, error_dir) != NULL);
+  DeleteBlocks(no_error_new_blocks);
+
+  // A new block manager should use the both dirs for backing storage.
   BufferedBlockMgr::Client* new_client;
   BufferedBlockMgr* new_block_mgr =
       CreateMgrAndClient(9999, blocks_per_mgr, block_size_, 0, false, 
&new_client);
   vector<BufferedBlockMgr::Block*> new_mgr_blocks;
   AllocateBlocks(new_block_mgr, new_client, blocks_per_mgr, &new_mgr_blocks);
   UnpinBlocks(new_mgr_blocks);
-  for (int i = 0; i < blocks_per_mgr; ++i) {
-    LOG(INFO) << "New manager Block " << i << " backed by file "
-              << new_mgr_blocks[i]->TmpFilePath();
-    ASSERT_TRUE(BlockInDir(new_mgr_blocks[i], good_dir));
-  }
+  WaitForWrites(block_mgrs);
+  EXPECT_TRUE(FindBlockForDir(new_mgr_blocks, good_dir) != NULL);
+  EXPECT_TRUE(FindBlockForDir(new_mgr_blocks, error_dir) != NULL);
+  DeleteBlocks(new_mgr_blocks);
+
+  DeleteBlocks(all_blocks);
 }
 
 // Check that allocation error resulting from removal of directory results in 
blocks
@@ -1151,7 +1177,7 @@ TEST_F(BufferedBlockMgrTest, AllocationErrorHandling) {
   // use the good dir.
   UnpinBlocks(blocks[0]);
   // Directories remain on active list even when they experience errors.
-  ASSERT_EQ(2, test_env_->tmp_file_mgr()->num_active_tmp_devices());
+  ASSERT_EQ(2, test_env_->tmp_file_mgr()->NumActiveTmpDevices());
   // Blocks should not be written to bad dir even if it remains non-writable.
   UnpinBlocks(blocks[1]);
   // All writes should succeed.
@@ -1165,18 +1191,39 @@ TEST_F(BufferedBlockMgrTest, AllocationErrorHandling) {
 TEST_F(BufferedBlockMgrTest, NoDirsAllocationError) {
   vector<string> tmp_dirs = InitMultipleTmpDirs(2);
   int max_num_buffers = 2;
+  RuntimeState* runtime_state;
   BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr =
-      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
+  BufferedBlockMgr* block_mgr = CreateMgrAndClient(
+      0, max_num_buffers, block_size_, 0, false, &client, &runtime_state);
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
   for (int i = 0; i < tmp_dirs.size(); ++i) {
     const string& tmp_scratch_subdir = tmp_dirs[i] + SCRATCH_SUFFIX;
     chmod(tmp_scratch_subdir.c_str(), 0);
   }
+  ErrorLogMap error_log;
+  runtime_state->GetErrors(&error_log);
+  ASSERT_TRUE(error_log.empty());
   for (int i = 0; i < blocks.size(); ++i) {
-    ASSERT_FALSE(blocks[i]->Unpin().ok());
+    // Writes won't fail until the actual I/O is attempted.
+    ASSERT_OK(blocks[i]->Unpin());
   }
+
+  LOG(INFO) << "Waiting for writes.";
+  // Write failure should cancel query.
+  WaitForWrites(block_mgr);
+  LOG(INFO) << "writes done.";
+  ASSERT_TRUE(block_mgr->IsCancelled());
+  runtime_state->GetErrors(&error_log);
+  ASSERT_FALSE(error_log.empty());
+  stringstream error_string;
+  PrintErrorMap(&error_string, error_log);
+  LOG(INFO) << "Errors: " << error_string.str();
+  ASSERT_NE(
+      string::npos, error_string.str().find("No usable scratch files: space 
could "
+                                            "not be allocated in any of the 
configured "
+                                            "scratch directories 
(--scratch_dirs)"))
+      << error_string.str();
   DeleteBlocks(blocks);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc 
b/be/src/runtime/buffered-block-mgr.cc
index 0c3d25f..d4e14a2 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -29,12 +29,9 @@
 
 #include <gutil/strings/substitute.h>
 
-DEFINE_bool(disk_spill_encryption, false, "Set this to encrypt and perform an 
integrity "
-  "check on all data spilled to disk during a query");
-
 #include "common/names.h"
 
-using namespace strings;   // for Substitute
+using namespace strings; // for Substitute
 
 namespace impala {
 
@@ -132,11 +129,8 @@ BufferedBlockMgr::Block::Block(BufferedBlockMgr* block_mgr)
   : buffer_desc_(NULL),
     block_mgr_(block_mgr),
     client_(NULL),
-    write_range_(NULL),
-    tmp_file_(NULL),
     valid_data_len_(0),
-    num_rows_(0) {
-}
+    num_rows_(0) {}
 
 Status BufferedBlockMgr::Block::Pin(bool* pinned, Block* release_block, bool 
unpin) {
   return block_mgr_->PinBlock(this, pinned, release_block, unpin);
@@ -185,8 +179,8 @@ bool BufferedBlockMgr::Block::Validate() const {
 }
 
 string BufferedBlockMgr::Block::TmpFilePath() const {
-  if (tmp_file_ == NULL) return "";
-  return tmp_file_->path();
+  if (write_handle_ == NULL) return "";
+  return write_handle_->TmpFilePath();
 }
 
 string BufferedBlockMgr::Block::DebugString() const {
@@ -200,6 +194,9 @@ string BufferedBlockMgr::Block::DebugString() const {
      << "  Pinned: " << is_pinned_ << endl
      << "  Write Issued: " << in_write_ << endl
      << "  Client Local: " << client_local_ << endl;
+  if (write_handle_ != NULL) {
+    ss << "  Write handle: " << write_handle_->DebugString() << endl;
+  }
   if (client_ != NULL) ss << "  Client: " << client_->DebugString();
   return ss.str();
 }
@@ -208,7 +205,7 @@ BufferedBlockMgr::BufferedBlockMgr(RuntimeState* state, 
TmpFileMgr* tmp_file_mgr
     int64_t block_size, int64_t scratch_limit)
   : max_block_size_(block_size),
     // Keep two writes in flight per scratch disk so the disks can stay busy.
-    block_write_threshold_(tmp_file_mgr->num_active_tmp_devices() * 2),
+    block_write_threshold_(tmp_file_mgr->NumActiveTmpDevices() * 2),
     disable_spill_(state->query_ctx().disable_spilling || 
block_write_threshold_ == 0
         || scratch_limit == 0),
     query_id_(state->query_id()),
@@ -217,7 +214,6 @@ BufferedBlockMgr::BufferedBlockMgr(RuntimeState* state, 
TmpFileMgr* tmp_file_mgr
     total_pinned_buffers_(0),
     non_local_outstanding_writes_(0),
     tmp_file_group_(NULL),
-    io_mgr_(state->io_mgr()),
     is_cancelled_(false),
     writes_issued_(0),
     debug_write_delay_ms_(0) {}
@@ -356,7 +352,7 @@ bool BufferedBlockMgr::ConsumeMemory(Client* client, 
int64_t size) {
   // If we either couldn't acquire enough buffers or WriteUnpinnedBlocks() 
failed, undo
   // the reservation.
   if (buffers_acquired != buffers_needed || !status.ok()) {
-    if (!status.ok()) {
+    if (!status.ok() && !status.IsCancelled()) {
       VLOG_QUERY << "Query: " << query_id_ << " write unpinned buffers 
failed.";
       client->state_->LogError(status.msg());
     }
@@ -388,8 +384,6 @@ void BufferedBlockMgr::Cancel() {
     if (is_cancelled_) return;
     is_cancelled_ = true;
   }
-  // Cancel the underlying io mgr to unblock any waiting threads.
-  io_mgr_->CancelContext(io_request_context_);
 }
 
 bool BufferedBlockMgr::IsCancelled() {
@@ -548,14 +542,13 @@ BufferedBlockMgr::~BufferedBlockMgr() {
   // Do not do that with 'static_block_mgrs_lock_' held.
   other_mgr_ptr.reset();
 
-  if (io_request_context_ != NULL) 
io_mgr_->UnregisterContext(io_request_context_);
+  // Delete tmp files and cancel any in-flight writes.
+  tmp_file_group_->Close();
 
   // If there are any outstanding writes and we are here it means that when the
   // WriteComplete() callback gets executed it is going to access invalid 
memory.
   // See IMPALA-1890.
   DCHECK_EQ(non_local_outstanding_writes_, 0) << endl << DebugInternal();
-  // Delete tmp files.
-  tmp_file_group_->Close();
 
   // Validate that clients deleted all of their blocks. Since all writes have
   // completed at this point, any deleted blocks should be in unused_blocks_.
@@ -591,6 +584,13 @@ MemTracker* BufferedBlockMgr::get_tracker(Client* client) 
const {
   return client->tracker_;
 }
 
+int64_t BufferedBlockMgr::GetNumWritesOutstanding() {
+  // Acquire lock to avoid returning mid-way through WriteComplete() when the
+  // state may be inconsistent.
+  lock_guard<mutex> lock(lock_);
+  return profile()->GetCounter("BlockWritesOutstanding")->value();
+}
+
 Status BufferedBlockMgr::DeleteOrUnpinBlock(Block* block, bool unpin) {
   if (block == NULL) {
     return IsCancelled() ? Status::CANCELLED : Status::OK();
@@ -619,8 +619,10 @@ Status BufferedBlockMgr::PinBlock(Block* block, bool* 
pinned, Block* release_blo
   if (!status.ok()) goto error;
   *pinned = block->is_pinned_;
 
-  // Block was not evicted or had no data, nothing left to do.
-  if (in_mem || block->valid_data_len_ == 0) {
+  if (in_mem) {
+    // The block's buffer is still in memory with the original data.
+    status = CancelWrite(block);
+    if (!status.ok()) goto error;
     return DeleteOrUnpinBlock(release_block, unpin);
   }
 
@@ -628,6 +630,9 @@ Status BufferedBlockMgr::PinBlock(Block* block, bool* 
pinned, Block* release_blo
     if (release_block == NULL) return Status::OK();
 
     if (block->buffer_desc_ != NULL) {
+      // The block's buffer is still in memory but we couldn't get an 
additional buffer
+      // because it would eat into another client's reservation. However, we 
can use
+      // release_block's reservation, so reclaim the buffer.
       {
         lock_guard<mutex> lock(lock_);
         if (free_io_buffers_.Contains(block->buffer_desc_)) {
@@ -646,9 +651,12 @@ Status BufferedBlockMgr::PinBlock(Block* block, bool* 
pinned, Block* release_blo
         status = WriteUnpinnedBlocks();
         if (!status.ok()) goto error;
       }
+      status = CancelWrite(block);
+      if (!status.ok()) goto error;
       return DeleteOrUnpinBlock(release_block, unpin);
     }
-
+    // FindBufferForBlock() wasn't able to find a buffer so transfer the one 
from
+    // 'release_block'.
     status = TransferBuffer(block, release_block, unpin);
     if (!status.ok()) goto error;
     DCHECK(!release_block->is_pinned_);
@@ -657,33 +665,14 @@ Status BufferedBlockMgr::PinBlock(Block* block, bool* 
pinned, Block* release_blo
     *pinned = true;
   }
 
-  DCHECK(block->write_range_ != NULL) << block->DebugString() << endl << 
release_block;
-
-  {
-    // Read the block from disk if it was not in memory.
-    SCOPED_TIMER(disk_read_timer_);
-    // Create a ScanRange to perform the read.
-    DiskIoMgr::ScanRange* scan_range =
-        obj_pool_.Add(new DiskIoMgr::ScanRange());
-    scan_range->Reset(NULL, block->write_range_->file(), 
block->write_range_->len(),
-        block->write_range_->offset(), block->write_range_->disk_id(), false,
-        DiskIoMgr::BufferOpts::ReadInto(block->buffer(), block->buffer_len()));
-    DiskIoMgr::BufferDescriptor* io_mgr_buffer;
-    status = io_mgr_->Read(io_request_context_, scan_range, &io_mgr_buffer);
-    if (!status.ok()) goto error;
-
-    DCHECK_EQ(io_mgr_buffer->buffer(), block->buffer());
-    DCHECK_EQ(io_mgr_buffer->len(), block->valid_data_len());
-    DCHECK(io_mgr_buffer->eosr());
-    io_mgr_buffer->Return();
-  }
+  DCHECK(block->write_handle_ != NULL) << block->DebugString() << endl << 
release_block;
 
-  if (FLAGS_disk_spill_encryption) {
-    // Decryption is done in-place, since the buffer can't be accessed by 
anyone else.
-    status = CheckHashAndDecrypt(block);
+  // The block is on disk - read it back into memory.
+  if (block->valid_data_len() > 0) {
+    status = tmp_file_group_->Read(block->write_handle_.get(), 
block->valid_data());
     if (!status.ok()) goto error;
   }
-
+  tmp_file_group_->DestroyWriteHandle(move(block->write_handle_));
   return DeleteOrUnpinBlock(release_block, unpin);
 
 error:
@@ -693,6 +682,24 @@ error:
   return status;
 }
 
+Status BufferedBlockMgr::CancelWrite(Block* block) {
+  {
+    unique_lock<mutex> lock(lock_);
+    DCHECK(block->buffer_desc_ != NULL);
+    // If there is an in-flight write, wait for it to finish. This is 
sub-optimal
+    // compared to just cancelling the write, but reduces the number of 
possible
+    // code paths in this legacy code.
+    WaitForWrite(lock, block);
+    if (is_cancelled_) return Status::CANCELLED;
+  }
+  if (block->write_handle_ != NULL) {
+    // Restore the in-memory data without reading from disk (e.g. decrypt it).
+    RETURN_IF_ERROR(tmp_file_group_->CancelWriteAndRestoreData(
+        move(block->write_handle_), block->valid_data()));
+  }
+  return Status::OK();
+}
+
 Status BufferedBlockMgr::UnpinBlock(Block* block) {
   DCHECK(!block->is_deleted_) << "Unpin for deleted block.";
 
@@ -738,49 +745,17 @@ Status BufferedBlockMgr::WriteUnpinnedBlock(Block* block) 
{
   // Assumes block manager lock is already taken.
   DCHECK(!block->is_pinned_) << block->DebugString();
   DCHECK(!block->in_write_) << block->DebugString();
+  DCHECK(block->write_handle_ == NULL) << block->DebugString();
   DCHECK_EQ(block->buffer_desc_->len, max_block_size_);
 
-  if (block->write_range_ == NULL) {
-    if (tmp_file_group_->NumFiles() == 0) {
-      RETURN_IF_ERROR(tmp_file_group_->CreateFiles(query_id_));
-    }
+  // The block is on disk - read it back into memory.
+  RETURN_IF_ERROR(tmp_file_group_->Write(block->valid_data(),
+      [this, block](const Status& write_status) { WriteComplete(block, 
write_status); },
+      &block->write_handle_));
 
-    // First time the block is being persisted - need to allocate tmp file 
space.
-    TmpFileMgr::File* tmp_file;
-    int64_t file_offset;
-    RETURN_IF_ERROR(
-        tmp_file_group_->AllocateSpace(max_block_size_, &tmp_file, 
&file_offset));
-    int disk_id = tmp_file->disk_id();
-    if (disk_id < 0) {
-      // Assign a valid disk id to the write range if the tmp file was not 
assigned one.
-      static unsigned int next_disk_id = 0;
-      disk_id = ++next_disk_id;
-    }
-    disk_id %= io_mgr_->num_local_disks();
-    DiskIoMgr::WriteRange::WriteDoneCallback callback =
-        bind(mem_fn(&BufferedBlockMgr::WriteComplete), this, block, _1);
-    block->write_range_ = obj_pool_.Add(new DiskIoMgr::WriteRange(
-        tmp_file->path(), file_offset, disk_id, callback));
-    block->tmp_file_ = tmp_file;
-  }
-
-  uint8_t* outbuf = NULL;
-  if (FLAGS_disk_spill_encryption) {
-    // The block->buffer() could be accessed during the write path, so we have 
to
-    // make a copy of it while writing.
-    RETURN_IF_ERROR(EncryptAndHash(block, &outbuf));
-  } else {
-    outbuf = block->buffer();
-  }
-
-  block->write_range_->SetData(outbuf, block->valid_data_len_);
-
-  // Issue write through DiskIoMgr.
-  RETURN_IF_ERROR(io_mgr_->AddWriteRange(io_request_context_, 
block->write_range_));
   block->in_write_ = true;
   DCHECK(block->Validate()) << endl << block->DebugString();
   outstanding_writes_counter_->Add(1);
-  bytes_written_counter_->Add(block->valid_data_len_);
   ++writes_issued_;
   if (writes_issued_ == 1) {
     if (ImpaladMetrics::NUM_QUERIES_SPILLED != NULL) {
@@ -805,25 +780,22 @@ void BufferedBlockMgr::WriteComplete(Block* block, const 
Status& write_status) {
 #endif
   Status status = Status::OK();
   lock_guard<mutex> lock(lock_);
-  outstanding_writes_counter_->Add(-1);
   DCHECK(Validate()) << endl << DebugInternal();
   DCHECK(is_cancelled_ || block->in_write_) << "WriteComplete() for block not 
in write."
-                                            << endl << block->DebugString();
+                                            << endl
+                                            << block->DebugString();
   DCHECK(block->buffer_desc_ != NULL);
+
+  outstanding_writes_counter_->Add(-1);
   if (!block->client_local_) {
     DCHECK_GT(non_local_outstanding_writes_, 0) << block->DebugString();
     --non_local_outstanding_writes_;
   }
   block->in_write_ = false;
 
-  // Explicitly release our temporarily allocated buffer here so that it 
doesn't
-  // hang around needlessly.
-  if (FLAGS_disk_spill_encryption) EncryptedWriteComplete(block);
-
   // ReturnUnusedBlock() will clear the block, so save required state in local 
vars.
   // state is not valid if the block was deleted because the state may be torn 
down
   // after the state's fragment has deleted all of its blocks.
-  TmpFileMgr::File* tmp_file = block->tmp_file_;
   RuntimeState* state = block->is_deleted_ ? NULL : block->client_->state_;
 
   // If the block was re-pinned when it was in the IOMgr queue, don't free it.
@@ -847,18 +819,17 @@ void BufferedBlockMgr::WriteComplete(Block* block, const 
Status& write_status) {
 
   if (!write_status.ok() || !status.ok() || is_cancelled_) {
     VLOG_FILE << "Query: " << query_id_ << ". Write did not complete 
successfully: "
-        "write_status=" << write_status.GetDetail() << ", status=" << 
status.GetDetail()
-        << ". is_cancelled_=" << is_cancelled_;
-
+                                           "write_status="
+              << write_status.GetDetail() << ", status=" << status.GetDetail()
+              << ". is_cancelled_=" << is_cancelled_;
     // If the instance is already cancelled, don't confuse things with these 
errors.
     if (!write_status.ok() && !write_status.IsCancelled()) {
       // Report but do not attempt to recover from write error.
-      DCHECK(tmp_file != NULL);
-      if (!write_status.IsMemLimitExceeded()) 
tmp_file->ReportIOError(write_status.msg());
       VLOG_QUERY << "Query: " << query_id_ << " write complete callback with 
error.";
+
       if (state != NULL) state->LogError(write_status.msg());
     }
-    if (!status.ok()) {
+    if (!status.ok() && !status.IsCancelled()) {
       VLOG_QUERY << "Query: " << query_id_ << " error while writing unpinned 
blocks.";
       if (state != NULL) state->LogError(status.msg());
     }
@@ -875,6 +846,7 @@ void BufferedBlockMgr::WriteComplete(Block* block, const 
Status& write_status) {
   if (!block->client_local_) buffer_available_cv_.notify_all();
   if (block->is_deleted_) {
     // Finish the DeleteBlock() work.
+    tmp_file_group_->DestroyWriteHandle(move(block->write_handle_));
     block->buffer_desc_->block = NULL;
     block->buffer_desc_ = NULL;
     ReturnUnusedBlock(block);
@@ -913,7 +885,9 @@ void BufferedBlockMgr::DeleteBlockLocked(const 
unique_lock<mutex>& lock, Block*
   if (block->in_write_) {
     DCHECK(block->buffer_desc_ != NULL && block->buffer_desc_->len == 
max_block_size_)
         << "Should never be writing a small buffer";
-    // If a write is still pending, return. Cleanup will be done in 
WriteComplete().
+    // If a write is still pending, cancel it and return. Cleanup will be done 
in
+    // WriteComplete(). Cancelling the write ensures that it won't try to log 
to the
+    // RuntimeState (which may be torn down before the block manager).
     DCHECK(block->Validate()) << endl << block->DebugString();
     return;
   }
@@ -935,6 +909,12 @@ void BufferedBlockMgr::DeleteBlockLocked(const 
unique_lock<mutex>& lock, Block*
       block->buffer_desc_ = NULL;
     }
   }
+
+  // Discard any on-disk data. The write is finished so this won't call back 
into
+  // BufferedBlockMgr.
+  if (block->write_handle_ != NULL) {
+    tmp_file_group_->DestroyWriteHandle(move(block->write_handle_));
+  }
   ReturnUnusedBlock(block);
   DCHECK(block->Validate()) << endl << block->DebugString();
   DCHECK(Validate()) << endl << DebugInternal();
@@ -1224,7 +1204,7 @@ string BufferedBlockMgr::DebugString(Client* client) {
 
 string BufferedBlockMgr::DebugInternal() const {
   stringstream ss;
-  ss << "Buffered block mgr" << endl
+  ss << "Buffered block mgr " << this << endl
      << "  Num writes outstanding: " << outstanding_writes_counter_->value() 
<< endl
      << "  Num free io buffers: " << free_io_buffers_.size() << endl
      << "  Num unpinned blocks: " << unpinned_blocks_.size() << endl
@@ -1234,6 +1214,7 @@ string BufferedBlockMgr::DebugInternal() const {
      << "  Remaining memory: " << mem_tracker_->SpareCapacity()
      << " (#blocks=" << (mem_tracker_->SpareCapacity() / max_block_size_) << 
")" << endl
      << "  Block write threshold: " << block_write_threshold_;
+  if (tmp_file_group_ != NULL) ss << tmp_file_group_->DebugString();
   return ss.str();
 }
 
@@ -1243,13 +1224,11 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, 
TmpFileMgr* tmp_file_mgr,
   unique_lock<mutex> l(lock_);
   if (initialized_) return;
 
-  io_mgr->RegisterContext(&io_request_context_, NULL);
-
   profile_.reset(new RuntimeProfile(&obj_pool_, "BlockMgr"));
   parent_profile->AddChild(profile_.get());
 
-  tmp_file_group_.reset(
-      new TmpFileMgr::FileGroup(tmp_file_mgr, profile_.get(), scratch_limit));
+  tmp_file_group_.reset(new TmpFileMgr::FileGroup(
+      tmp_file_mgr, io_mgr, profile_.get(), query_id_, max_block_size_, 
scratch_limit));
 
   mem_limit_counter_ = ADD_COUNTER(profile_.get(), "MemoryLimit", 
TUnit::BYTES);
   mem_limit_counter_->Set(mem_limit);
@@ -1257,13 +1236,10 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, 
TmpFileMgr* tmp_file_mgr,
   block_size_counter_->Set(max_block_size_);
   created_block_counter_ = ADD_COUNTER(profile_.get(), "BlocksCreated", 
TUnit::UNIT);
   recycled_blocks_counter_ = ADD_COUNTER(profile_.get(), "BlocksRecycled", 
TUnit::UNIT);
-  bytes_written_counter_ = ADD_COUNTER(profile_.get(), "BytesWritten", 
TUnit::BYTES);
   outstanding_writes_counter_ =
       ADD_COUNTER(profile_.get(), "BlockWritesOutstanding", TUnit::UNIT);
   buffered_pin_counter_ = ADD_COUNTER(profile_.get(), "BufferedPins", 
TUnit::UNIT);
-  disk_read_timer_ = ADD_TIMER(profile_.get(), "TotalReadBlockTime");
   buffer_wait_timer_ = ADD_TIMER(profile_.get(), "TotalBufferWaitTime");
-  encryption_timer_ = ADD_TIMER(profile_.get(), "TotalEncryptionTime");
 
   // Create a new mem_tracker and allocate buffers.
   mem_tracker_.reset(
@@ -1272,45 +1248,4 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, 
TmpFileMgr* tmp_file_mgr,
   initialized_ = true;
 }
 
-Status BufferedBlockMgr::EncryptAndHash(Block* block, uint8_t** outbuf) {
-  DCHECK(FLAGS_disk_spill_encryption);
-  DCHECK(block->buffer());
-  DCHECK(!block->is_pinned_);
-  DCHECK(!block->in_write_);
-  DCHECK(outbuf);
-  SCOPED_TIMER(encryption_timer_);
-  // Encrypt to a temporary buffer since so that the original data is still in 
the buffer
-  // if the block is re-pinned while the write is still in-flight.
-  block->encrypted_write_buffer_.reset(new uint8_t[block->valid_data_len_]);
-  // Since we're using AES-CFB mode, we must take care not to reuse a key/IV 
pair.
-  // Regenerate a new key and IV for every block of data we write, including 
between
-  // writes of the same Block.
-  block->key_.InitializeRandom();
-  RETURN_IF_ERROR(block->key_.Encrypt(
-      block->buffer(), block->valid_data_len_, 
block->encrypted_write_buffer_.get()));
-
-  block->hash_.Compute(block->encrypted_write_buffer_.get(), 
block->valid_data_len_);
-
-  *outbuf = block->encrypted_write_buffer_.get();
-  return Status::OK();
-}
-
-void BufferedBlockMgr::EncryptedWriteComplete(Block* block) {
-  DCHECK(FLAGS_disk_spill_encryption);
-  DCHECK(block->encrypted_write_buffer_.get());
-  block->encrypted_write_buffer_.reset();
-}
-
-Status BufferedBlockMgr::CheckHashAndDecrypt(Block* block) {
-  DCHECK(FLAGS_disk_spill_encryption);
-  DCHECK(block->buffer());
-  SCOPED_TIMER(encryption_timer_);
-
-  if (!block->hash_.Verify(block->buffer(), block->valid_data_len_)) {
-    return Status("Block verification failure");
-  }
-  // Decrypt block->buffer() in-place. Safe because no one is accessing it.
-  return block->key_.Decrypt(block->buffer(), block->valid_data_len_, 
block->buffer());
-}
-
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/buffered-block-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.h 
b/be/src/runtime/buffered-block-mgr.h
index 269b707..bbe8429 100644
--- a/be/src/runtime/buffered-block-mgr.h
+++ b/be/src/runtime/buffered-block-mgr.h
@@ -20,7 +20,7 @@
 
 #include "runtime/disk-io-mgr.h"
 #include "runtime/tmp-file-mgr.h"
-#include "util/openssl-util.h"
+#include "util/mem-range.h"
 
 namespace impala {
 
@@ -175,6 +175,13 @@ class BufferedBlockMgr {
       return buffer_desc_->buffer;
     }
 
+    /// Returns a reference to the valid data in the block's buffer. Only 
guaranteed to
+    /// be valid if the block is pinned.
+    MemRange valid_data() const {
+      DCHECK(buffer_desc_ != NULL);
+      return MemRange(buffer_desc_->buffer, valid_data_len_);
+    }
+
     /// Return the number of bytes allocated in this block.
     int64_t valid_data_len() const { return valid_data_len_; }
 
@@ -223,17 +230,9 @@ class BufferedBlockMgr {
     /// The client that owns this block.
     Client* client_;
 
-    /// WriteRange object representing the on-disk location used to persist a 
block.
-    /// Is created the first time a block is persisted, and retained until the 
block
-    /// object is destroyed. The file location and offset in write_range_ are 
valid
-    /// throughout the lifetime of this object, but the data and length in the
-    /// write_range_ are only valid while the block is being written.
-    /// write_range_ instance is owned by the block manager.
-    DiskIoMgr::WriteRange* write_range_;
-
-    /// The file this block belongs to. The lifetime is the same as the file 
location
-    /// and offset in write_range_. The File is owned by BufferedBlockMgr, not 
TmpFileMgr.
-    TmpFileMgr::File* tmp_file_;
+    /// Non-NULL when the block data is written to scratch or is in the 
process of being
+    /// written.
+    std::unique_ptr<TmpFileMgr::WriteHandle> write_handle_;
 
     /// Length of valid (i.e. allocated) data within the block.
     int64_t valid_data_len_;
@@ -241,20 +240,6 @@ class BufferedBlockMgr {
     /// Number of rows in this block.
     int num_rows_;
 
-    /// If --disk_spill_encryption is on, in the write path we allocate a new 
buffer to
-    /// hold encrypted data while it's being written to disk. In the read 
path, we can
-    /// instead decrypt data in place since no one else because the read 
buffer isn't
-    /// accessible to any other threads until Pin() returns.
-    boost::scoped_array<uint8_t> encrypted_write_buffer_;
-
-    /// If --disk_spill_encryption is on, a AES 256-bit key and initialization 
vector.
-    /// Regenerated on each write.
-    EncryptionKey key_;
-
-    /// If --disk_spill_encryption is on, our hash of the data being written. 
Filled in
-    /// on writes; verified on reads. This is calculated _after_ encryption.
-    IntegrityHash hash_;
-
     /// Block state variables. The block's buffer can be freed only if 
is_pinned_ and
     /// in_write_ are both false.
 
@@ -335,8 +320,8 @@ class BufferedBlockMgr {
   ///   - If there is memory pressure, block will get the buffer from 
'unpin_block'.
   Status GetNewBlock(Client* client, Block* unpin_block, Block** block, 
int64_t len = -1);
 
-  /// Cancels the block mgr. All subsequent calls that return a Status fail 
with
-  /// Status::CANCELLED. Idempotent.
+  /// Test helper to cancel the block mgr. All subsequent calls that return a 
Status fail
+  /// with Status::CANCELLED. Idempotent.
   void Cancel();
 
   /// Returns true if the block manager was cancelled.
@@ -360,10 +345,6 @@ class BufferedBlockMgr {
   /// ReleaseMemory() call.
   void ReleaseMemory(Client* client, int64_t size);
 
-  /// The number of buffers available for client. That is, if all other 
clients were
-  /// stopped, the number of buffers this client could get.
-  int64_t available_buffers(Client* client) const;
-
   /// Returns a MEM_LIMIT_EXCEEDED error which includes the minimum memory 
required by
   /// this 'client' that acts on behalf of the node with id 'node_id'. 
'node_id' is used
   /// only for error reporting.
@@ -381,6 +362,7 @@ class BufferedBlockMgr {
   void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; }
 
  private:
+  friend class BufferedBlockMgrTest;
   friend struct Client;
 
   /// Descriptor for a single memory buffer in the pool.
@@ -415,6 +397,12 @@ class BufferedBlockMgr {
   void DeleteBlock(Block* block);
   void DeleteBlockLocked(const boost::unique_lock<boost::mutex>& lock, Block* 
block);
 
+  /// If there is an in-flight write, cancel the write and restore the 
contents of the
+  /// block's buffer. If no write has been started for 'block', does nothing. 
'block'
+  /// must have an associated buffer. Returns an error status if an error is 
encountered
+  /// while cancelling the write or CANCELLED if the block mgr is cancelled.
+  Status CancelWrite(Block* block);
+
   /// If the 'block' is NULL, checks if cancelled and returns. Otherwise, 
depending on
   /// 'unpin' calls either  DeleteBlock() or UnpinBlock(), which both first 
check for
   /// cancellation. It should be called without the lock_ acquired.
@@ -428,6 +416,10 @@ class BufferedBlockMgr {
   /// The caller should not hold 'lock_'.
   Status TransferBuffer(Block* dst, Block* src, bool unpin);
 
+  /// The number of buffers available for client. That is, if all other 
clients were
+  /// stopped, the number of buffers this client could get.
+  int64_t available_buffers(Client* client) const;
+
   /// Returns the total number of unreserved buffers. This is the sum of 
unpinned,
   /// free and buffers we can still allocate minus the total number of 
reserved buffers
   /// that are not pinned.
@@ -461,8 +453,8 @@ class BufferedBlockMgr {
   /// Issues the write for this block to the DiskIoMgr.
   Status WriteUnpinnedBlock(Block* block);
 
-  /// Wait until either the write for 'block' completes or the block mgr is 
cancelled.
-  /// 'lock_' must be held with 'lock'.
+  /// Wait until either there is no in-flight write for 'block' or the block 
mgr is
+  /// cancelled. 'lock_' must be held with 'lock'.
   void WaitForWrite(boost::unique_lock<boost::mutex>& lock, Block* block);
 
   /// Callback used by DiskIoMgr to indicate a block write has completed.  
write_status
@@ -481,6 +473,9 @@ class BufferedBlockMgr {
   /// Non-blocking and needs no lock_.
   Block* GetUnusedBlock(Client* client);
 
+  // Test helper to get the number of block writes currently outstanding.
+  int64_t GetNumWritesOutstanding();
+
   /// Used to debug the state of the block manager. Lock must already be taken.
   bool Validate() const;
   std::string DebugInternal() const;
@@ -559,10 +554,6 @@ class BufferedBlockMgr {
   /// blocks may be written. Blocks are round-robined across these files.
   boost::scoped_ptr<TmpFileMgr::FileGroup> tmp_file_group_;
 
-  /// DiskIoMgr handles to read and write blocks.
-  DiskIoMgr* io_mgr_;
-  DiskIoRequestContext* io_request_context_;
-
   /// If true, a disk write failed and all API calls return.
   /// Status::CANCELLED. Set to true if there was an error writing a block, or 
if
   /// WriteComplete() needed to reissue the write and that failed.
@@ -584,21 +575,12 @@ class BufferedBlockMgr {
   /// Number of Pin() calls that did not require a disk read.
   RuntimeProfile::Counter* buffered_pin_counter_;
 
-  /// Time taken for disk reads.
-  RuntimeProfile::Counter* disk_read_timer_;
-
   /// Time spent waiting for a free buffer.
   RuntimeProfile::Counter* buffer_wait_timer_;
 
-  /// Number of bytes written to disk (includes writes still queued in the IO 
manager).
-  RuntimeProfile::Counter* bytes_written_counter_;
-
   /// Number of writes outstanding (issued but not completed).
   RuntimeProfile::Counter* outstanding_writes_counter_;
 
-  /// Time spent in disk spill encryption, decryption, and integrity checking.
-  RuntimeProfile::Counter* encryption_timer_;
-
   /// Number of writes issued.
   int writes_issued_;
 
@@ -609,21 +591,9 @@ class BufferedBlockMgr {
   /// map contains only weak ptrs. BufferedBlockMgrs that are handed out are 
shared ptrs.
   /// When all the shared ptrs are no longer referenced, the BufferedBlockMgr
   /// d'tor will be called at which point the weak ptr will be removed from 
the map.
-  typedef boost::unordered_map<TUniqueId, std::weak_ptr<BufferedBlockMgr>>
-      BlockMgrsMap;
+  typedef boost::unordered_map<TUniqueId, std::weak_ptr<BufferedBlockMgr>> 
BlockMgrsMap;
   static BlockMgrsMap query_to_block_mgrs_;
 
-  /// Takes the data in buffer(), allocates 'encrypted_write_buffer_', 
encrypts the data
-  /// into 'encrypted_write_buffer_' and computes 'hash_'. Returns a pointer 
to the
-  /// encrypted data in 'outbuf'.
-  Status EncryptAndHash(Block* block, uint8_t** outbuf);
-
-  /// Deallocates the block's encrypted write buffer alloced in 
EncryptAndHash().
-  void EncryptedWriteComplete(Block* block);
-
-  /// Verifies the integrity hash and decrypts the contents of buffer() in 
place.
-  Status CheckHashAndDecrypt(Block* block);
-
   /// Debug option to delay write completion.
   int debug_write_delay_ms_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc 
b/be/src/runtime/disk-io-mgr-test.cc
index 9f8d6d7..016b14f 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -239,12 +239,12 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
   read_io_mgr.reset();
 }
 
-// Perform invalid writes (e.g. non-existent file, negative offset) and 
validate
-// that an error status is returned via the write callback.
+// Perform invalid writes (e.g. file in non-existent directory, negative 
offset) and
+// validate that an error status is returned via the write callback.
 TEST_F(DiskIoMgrTest, InvalidWrite) {
   MemTracker mem_tracker(LARGE_MEM_LIMIT);
   num_ranges_written_ = 0;
-  string tmp_file = "/tmp/non-existent.txt";
+  string tmp_file = "/non-existent/file.txt";
   DiskIoMgr io_mgr(1, 1, 1, 10);
   ASSERT_OK(io_mgr.Init(&mem_tracker));
   DiskIoRequestContext* writer;
@@ -252,12 +252,12 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   int32_t* data = pool_->Add(new int32_t);
   *data = rand();
 
-  // Write to a non-existent file.
+  // Write to file in non-existent directory.
   DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*);
   DiskIoMgr::WriteRange::WriteDoneCallback callback =
-      bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
-          new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL,
-          data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
+      bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range,
+          (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, data,
+          Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
   *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, 
callback));
 
   (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 87ac33a..20cb9b5 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -78,6 +78,8 @@ static const int LOW_MEMORY = 64 * 1024 * 1024;
 
 const int DiskIoMgr::DEFAULT_QUEUE_CAPACITY = 2;
 
+AtomicInt32 DiskIoMgr::next_disk_id_;
+
 namespace detail {
 // Indicates if file handle caching should be used
 static inline bool is_file_handle_caching_enabled() {
@@ -262,6 +264,11 @@ void DiskIoMgr::BufferDescriptor::Return() {
 DiskIoMgr::WriteRange::WriteRange(
     const string& file, int64_t file_offset, int disk_id, WriteDoneCallback 
callback)
   : RequestRange(RequestType::WRITE), callback_(callback) {
+  SetRange(file, file_offset, disk_id);
+}
+
+void DiskIoMgr::WriteRange::SetRange(
+    const std::string& file, int64_t file_offset, int disk_id) {
   file_ = file;
   offset_ = file_offset;
   disk_id_ = disk_id;
@@ -947,8 +954,11 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, 
RequestRange** range,
   return false;
 }
 
-void DiskIoMgr::HandleWriteFinished(DiskIoRequestContext* writer, WriteRange* 
write_range,
-    const Status& write_status) {
+void DiskIoMgr::HandleWriteFinished(
+    DiskIoRequestContext* writer, WriteRange* write_range, const Status& 
write_status) {
+  // Copy disk_id before running callback: the callback may modify write_range.
+  int disk_id = write_range->disk_id_;
+
   // Execute the callback before decrementing the thread count. Otherwise 
CancelContext()
   // that waits for the disk ref count to be 0 will return, creating a race, 
e.g.
   // between BufferedBlockMgr::WriteComplete() and 
BufferedBlockMgr::~BufferedBlockMgr().
@@ -958,7 +968,7 @@ void DiskIoMgr::HandleWriteFinished(DiskIoRequestContext* 
writer, WriteRange* wr
   {
     unique_lock<mutex> writer_lock(writer->lock_);
     DCHECK(writer->Validate()) << endl << writer->DebugString();
-    DiskIoRequestContext::PerDiskState& state = 
writer->disk_states_[write_range->disk_id_];
+    DiskIoRequestContext::PerDiskState& state = writer->disk_states_[disk_id];
     if (writer->state_ == DiskIoRequestContext::Cancelled) {
       state.DecrementRequestThreadAndCheckDone(writer);
     } else {
@@ -1152,13 +1162,24 @@ DiskIoMgr::BufferDescriptor* 
DiskIoMgr::TryAllocateNextBufferForRange(
 }
 
 void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* 
write_range) {
-  FILE* file_handle = fopen(write_range->file(), "rb+");
-  Status ret_status;
-  if (file_handle == NULL) {
+  Status ret_status = Status::OK();
+  FILE* file_handle = NULL;
+  // Raw open() syscall will create file if not present when passed these 
flags.
+  int fd = open(write_range->file(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
+  if (fd < 0) {
     ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
-        Substitute("fopen($0, \"rb+\") failed with errno=$1 description=$2",
-            write_range->file_, errno, GetStrErrMsg())));
+        Substitute("Opening '$0' for write failed with errno=$1 
description=$2",
+                                     write_range->file_, errno, 
GetStrErrMsg())));
   } else {
+    file_handle = fdopen(fd, "wb");
+    if (file_handle == NULL) {
+      ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
+          Substitute("fdopen($0, \"wb\") failed with errno=$1 description=$2", 
fd, errno,
+                                       GetStrErrMsg())));
+    }
+  }
+
+  if (file_handle != NULL) {
     ret_status = WriteRangeHelper(file_handle, write_range);
 
     int success = fclose(file_handle);
@@ -1225,9 +1246,8 @@ int DiskIoMgr::AssignQueue(const char* file, int disk_id, 
bool expected_local) {
   // Assign to a local disk queue.
   DCHECK(!IsS3APath(file)); // S3 is always remote.
   if (disk_id == -1) {
-    // disk id is unknown, assign it a random one.
-    static int next_disk_id = 0;
-    disk_id = next_disk_id++;
+    // disk id is unknown, assign it an arbitrary one.
+    disk_id = next_disk_id_.Add(1);
   }
   // TODO: we need to parse the config for the number of dirs configured for 
this
   // data node.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index 4b650a8..c67f69d 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -15,10 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
 #ifndef IMPALA_RUNTIME_DISK_IO_MGR_H
 #define IMPALA_RUNTIME_DISK_IO_MGR_H
 
+#include <functional>
 #include <list>
 #include <vector>
 
@@ -599,14 +599,22 @@ class DiskIoMgr {
     /// (TStatusCode::CANCELLED). The callback is only invoked if this 
WriteRange was
     /// successfully added (i.e. AddWriteRange() succeeded). No locks are held 
while
     /// the callback is invoked.
-    typedef boost::function<void (const Status&)> WriteDoneCallback;
+    typedef std::function<void(const Status&)> WriteDoneCallback;
     WriteRange(const std::string& file, int64_t file_offset, int disk_id,
         WriteDoneCallback callback);
 
+    /// Change the file and offset of this write range. Data and callbacks are 
unchanged.
+    /// Can only be called when the write is not in flight (i.e. before 
AddWriteRange()
+    /// is called or after the write callback was called).
+    void SetRange(const std::string& file, int64_t file_offset, int disk_id);
+
     /// Set the data and number of bytes to be written for this WriteRange.
-    /// File data can be over-written by calling SetData() and AddWriteRange().
+    /// Can only be called when the write is not in flight (i.e. before 
AddWriteRange()
+    /// is called or after the write callback was called).
     void SetData(const uint8_t* buffer, int64_t len);
 
+    const uint8_t* data() const { return data_; }
+
    private:
     friend class DiskIoMgr;
     friend class DiskIoRequestContext;
@@ -855,6 +863,11 @@ class DiskIoMgr {
   /// It is indexed by disk id.
   std::vector<DiskQueue*> disk_queues_;
 
+  /// The next disk queue to write to if the actual 'disk_id_' is unknown 
(i.e. the file
+  /// is not associated with a particular local disk or remote queue). Used to 
implement
+  /// round-robin assignment for that case.
+  static AtomicInt32 next_disk_id_;
+
   // Caching structure that maps file names to cached file handles. The cache 
has an upper
   // limit of entries defined by FLAGS_max_cached_file_handles. Evicted cached 
file
   // handles are closed.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index ad996e3..d93a459 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -171,7 +171,7 @@ ExecEnv::ExecEnv()
     scheduler_.reset(new SimpleScheduler(
         addresses, metrics_.get(), webserver_.get(), 
request_pool_service_.get()));
   }
-  if (exec_env_ == NULL) exec_env_ = this;
+  exec_env_ = this;
 }
 
 // TODO: Need refactor to get rid of duplicated code.
@@ -224,12 +224,10 @@ ExecEnv::ExecEnv(const string& hostname, int 
backend_port, int subscriber_port,
     scheduler_.reset(new SimpleScheduler(
         addresses, metrics_.get(), webserver_.get(), 
request_pool_service_.get()));
   }
-  if (exec_env_ == NULL) exec_env_ = this;
+  exec_env_ = this;
 }
 
-
-ExecEnv::~ExecEnv() {
-}
+ExecEnv::~ExecEnv() {}
 
 Status ExecEnv::InitForFeTests() {
   mem_tracker_.reset(new MemTracker(-1, "Process"));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 08ddd9f..be90a5a 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -61,7 +61,7 @@ class ExecEnv {
 
   /// Returns the first created exec env instance. In a normal impalad, this is
   /// the only instance. In test setups with multiple ExecEnv's per process,
-  /// we return the first instance.
+  /// we return the most recently created instance.
   static ExecEnv* GetInstance() { return exec_env_; }
 
   /// Empty destructor because the compiler-generated one requires full

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 2757750..def95c0 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -45,8 +45,6 @@ QueryState::QueryState(const TQueryCtx& query_ctx)
   // how many are distinct. It is defined as the sum of the number of generic 
errors and
   // the number of distinct other errors.
   if (query_options.max_errors <= 0) {
-    // TODO: fix linker error and uncomment this
-    //query_options_.max_errors = FLAGS_max_errors;
     query_options.max_errors = 100;
   }
   if (query_options.batch_size <= 0) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/tmp-file-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-internal.h 
b/be/src/runtime/tmp-file-mgr-internal.h
new file mode 100644
index 0000000..dd8bd07
--- /dev/null
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_TMP_FILE_MGR_INTERNAL_H
+#define IMPALA_RUNTIME_TMP_FILE_MGR_INTERNAL_H
+
+#include <string>
+
+#include "runtime/tmp-file-mgr.h"
+
+namespace impala {
+
+/// File is a handle to a physical file in a temporary directory. File space
+/// can be allocated and files removed using AllocateSpace() and Remove(). Used
+/// internally by TmpFileMgr.
+///
+/// Creation of the physical file in the file system is deferred until the 
file is
+/// written by DiskIoMgr.
+///
+/// Methods of File are not thread-safe.
+class TmpFileMgr::File {
+ public:
+  File(FileGroup* file_group, DeviceId device_id, const std::string& path);
+
+  /// Allocates 'num_bytes' bytes in this file for a new block of data.
+  /// The file size is increased by a call to truncate() if necessary.
+  /// Returns Status::OK() and sets 'offset' to the file offset of the first
+  /// byte in the allocated range on success.
+  /// Returns an error status if an unexpected error occurs, e.g. the file 
could not
+  /// be created.
+  Status AllocateSpace(int64_t num_bytes, int64_t* offset);
+
+  /// Called when an IO error is encountered for this file. Logs the error and 
blacklists
+  /// the file.
+  void Blacklist(const ErrorMsg& msg);
+
+  /// Delete the physical file on disk, if one was created.
+  /// It is not valid to read or write to a file after calling Remove().
+  Status Remove();
+
+  /// Get the disk ID that should be used for IO mgr queueing.
+  int AssignDiskQueue() const;
+
+  const std::string& path() const { return path_; }
+  bool is_blacklisted() const { return blacklisted_; }
+
+  std::string DebugString();
+
+ private:
+  friend class TmpFileMgrTest;
+  /// The name of the sub-directory that Impala creates within each configured 
scratch
+  /// directory.
+  const static std::string TMP_SUB_DIR_NAME;
+
+  /// Space (in MB) that must ideally be available for writing on a scratch
+  /// directory. A warning is issued if available space is less than this 
threshold.
+  const static uint64_t AVAILABLE_SPACE_THRESHOLD_MB;
+
+  /// The FileGroup this belongs to. Cannot be null.
+  FileGroup* const file_group_;
+
+  /// Path of the physical file in the filesystem.
+  const std::string path_;
+
+  /// The temporary device this file is stored on.
+  const DeviceId device_id_;
+
+  /// The id of the disk on which the physical file lies.
+  const int disk_id_;
+
+  /// Current bytes allocated in the file. Modified by AllocateSpace().
+  int64_t bytes_allocated_;
+
+  /// Set to true to indicate that we shouldn't allocate any more space in 
this file.
+  bool blacklisted_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc 
b/be/src/runtime/tmp-file-mgr-test.cc
index 4deefd5..61fd682 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -19,12 +19,16 @@
 
 #include <boost/filesystem.hpp>
 #include <boost/scoped_ptr.hpp>
+#include <boost/thread/locks.hpp>
 #include <gtest/gtest.h>
 
 #include "common/init.h"
+#include "runtime/test-env.h"
+#include "runtime/tmp-file-mgr-internal.h"
 #include "runtime/tmp-file-mgr.h"
 #include "service/fe-support.h"
 #include "testutil/gtest-util.h"
+#include "util/condition-variable.h"
 #include "util/filesystem-util.h"
 #include "util/metrics.h"
 
@@ -37,22 +41,27 @@ using boost::filesystem::path;
 namespace impala {
 
 class TmpFileMgrTest : public ::testing::Test {
- protected:
+ public:
   virtual void SetUp() {
     metrics_.reset(new MetricGroup("tmp-file-mgr-test"));
     profile_ = obj_pool_.Add(new RuntimeProfile(&obj_pool_, 
"tmp-file-mgr-test"));
+    test_env_.reset(new TestEnv);
+    cb_counter_ = 0;
   }
 
   virtual void TearDown() {
+    test_env_.reset();
     metrics_.reset();
     obj_pool_.Clear();
   }
 
+  DiskIoMgr* io_mgr() { return test_env_->exec_env()->disk_io_mgr(); }
+
   /// Check that metric values are consistent with TmpFileMgr state.
   void CheckMetrics(TmpFileMgr* tmp_file_mgr) {
-    vector<TmpFileMgr::DeviceId> active = tmp_file_mgr->active_tmp_devices();
-    IntGauge* active_metric = metrics_->FindMetricForTesting<IntGauge>(
-        "tmp-file-mgr.active-scratch-dirs");
+    vector<TmpFileMgr::DeviceId> active = tmp_file_mgr->ActiveTmpDevices();
+    IntGauge* active_metric =
+        
metrics_->FindMetricForTesting<IntGauge>("tmp-file-mgr.active-scratch-dirs");
     EXPECT_EQ(active.size(), active_metric->value());
     SetMetric<string>* active_set_metric =
         metrics_->FindMetricForTesting<SetMetric<string>>(
@@ -71,54 +80,118 @@ class TmpFileMgrTest : public ::testing::Test {
     }
   }
 
-  /// Helper to call the private NewFile() method.
-  static Status NewFile(TmpFileMgr::FileGroup* group,
-      const TmpFileMgr::DeviceId& device_id, const TUniqueId& query_id,
-      TmpFileMgr::File** new_file) {
-    return group->NewFile(device_id, query_id, new_file);
+  /// Helper to call the private CreateFiles() method and return
+  /// the created files.
+  static Status CreateFiles(
+      TmpFileMgr::FileGroup* group, vector<TmpFileMgr::File*>* files) {
+    // The method expects the lock to be held.
+    lock_guard<SpinLock> lock(group->lock_);
+    RETURN_IF_ERROR(group->CreateFiles());
+    for (unique_ptr<TmpFileMgr::File>& file : group->tmp_files_) {
+      files->push_back(file.get());
+    }
+    return Status::OK();
+  }
+
+  /// Helper to call the private TmpFileMgr::NewFile() method.
+  static Status NewFile(TmpFileMgr* mgr, TmpFileMgr::FileGroup* group,
+      TmpFileMgr::DeviceId device_id, unique_ptr<TmpFileMgr::File>* new_file) {
+    return mgr->NewFile(group, device_id, new_file);
   }
 
-  /// Helper to call the private TmpFile::AllocateSpace() method.
-  static Status AllocateSpace(
+  /// Helper to call the private File::AllocateSpace() method.
+  static Status FileAllocateSpace(
       TmpFileMgr::File* file, int64_t num_bytes, int64_t* offset) {
     return file->AllocateSpace(num_bytes, offset);
   }
 
+  /// Helper to call the private FileGroup::AllocateSpace() method.
+  static Status GroupAllocateSpace(TmpFileMgr::FileGroup* group, int64_t 
num_bytes,
+      TmpFileMgr::File** file, int64_t* offset) {
+    return group->AllocateSpace(num_bytes, file, offset);
+  }
+
+  /// Helper to set FileGroup::next_allocation_index_.
+  static void SetNextAllocationIndex(TmpFileMgr::FileGroup* group, int value) {
+    group->next_allocation_index_ = value;
+  }
+
+  /// Helper to get the # of bytes allocated by the group. Validates that the 
sum across
+  /// all files equals this total.
+  static int64_t BytesAllocated(TmpFileMgr::FileGroup* group) {
+    int64_t bytes_allocated = 0;
+    for (unique_ptr<TmpFileMgr::File>& file : group->tmp_files_) {
+      bytes_allocated += file->bytes_allocated_;
+    }
+    EXPECT_EQ(bytes_allocated, group->current_bytes_allocated_);
+    return bytes_allocated;
+  }
+
+  // Write callback, which signals 'cb_cv_' and increments 'cb_counter_'.
+  void SignalCallback(Status write_status) {
+    {
+      lock_guard<mutex> lock(cb_cv_lock_);
+      ++cb_counter_;
+    }
+    cb_cv_.NotifyAll();
+  }
+
+  /// Wait until 'cb_counter_' reaches 'val'.
+  void WaitForCallbacks(int64_t val) {
+    unique_lock<mutex> lock(cb_cv_lock_);
+    while (cb_counter_ < val) cb_cv_.Wait(lock);
+  }
+
   ObjectPool obj_pool_;
   scoped_ptr<MetricGroup> metrics_;
   // Owned by 'obj_pool_'.
   RuntimeProfile* profile_;
+
+  /// Used for DiskIoMgr.
+  scoped_ptr<TestEnv> test_env_;
+
+  // Variables used by SignalCallback().
+  mutex cb_cv_lock_;
+  ConditionVariable cb_cv_;
+  int64_t cb_counter_;
 };
 
 /// Regression test for IMPALA-2160. Verify that temporary file manager 
allocates blocks
-/// at the expected file offsets and expands the temporary file to the correct 
size.
+/// at the expected file offsets.
 TEST_F(TmpFileMgrTest, TestFileAllocation) {
   TmpFileMgr tmp_file_mgr;
   ASSERT_OK(tmp_file_mgr.Init(metrics_.get()));
-  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_);
+  TUniqueId id;
+  TmpFileMgr::FileGroup file_group(
+      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);
+
   // Default configuration should give us one temporary device.
-  EXPECT_EQ(1, tmp_file_mgr.num_active_tmp_devices());
-  vector<TmpFileMgr::DeviceId> tmp_devices = tmp_file_mgr.active_tmp_devices();
+  EXPECT_EQ(1, tmp_file_mgr.NumActiveTmpDevices());
+  vector<TmpFileMgr::DeviceId> tmp_devices = tmp_file_mgr.ActiveTmpDevices();
   EXPECT_EQ(1, tmp_devices.size());
-  TUniqueId id;
-  TmpFileMgr::File* file;
-  ASSERT_OK(NewFile(&file_group, tmp_devices[0], id, &file));
-  EXPECT_TRUE(file != NULL);
+  vector<TmpFileMgr::File*> files;
+  ASSERT_OK(CreateFiles(&file_group, &files));
+  EXPECT_EQ(1, files.size());
+  TmpFileMgr::File* file = files[0];
   // Apply writes of variable sizes and check space was allocated correctly.
   int64_t write_sizes[] = {1, 10, 1024, 4, 1024 * 1024 * 8, 1024 * 1024 * 8, 
16, 10};
   int num_write_sizes = sizeof(write_sizes) / sizeof(write_sizes[0]);
   int64_t next_offset = 0;
   for (int i = 0; i < num_write_sizes; ++i) {
     int64_t offset;
-    ASSERT_OK(AllocateSpace(file, write_sizes[i], &offset));
+    ASSERT_OK(FileAllocateSpace(file, write_sizes[i], &offset));
     EXPECT_EQ(next_offset, offset);
     next_offset = offset + write_sizes[i];
-    EXPECT_EQ(next_offset, boost::filesystem::file_size(file->path()));
   }
   // Check that cleanup is correct.
   string file_path = file->path();
-  file_group.Close();
   EXPECT_FALSE(boost::filesystem::exists(file_path));
+
+  // Check that the file is cleaned up correctly. Need to create file first 
since
+  // tmp file is only allocated on writes.
+  EXPECT_OK(FileSystemUtil::CreateFile(file->path()));
+  file_group.Close();
+  EXPECT_FALSE(boost::filesystem::exists(file->path()));
   CheckMetrics(&tmp_file_mgr);
 }
 
@@ -129,15 +202,18 @@ TEST_F(TmpFileMgrTest, TestOneDirPerDevice) {
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, true, metrics_.get());
-  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_);
+  TUniqueId id;
+  TmpFileMgr::FileGroup file_group(
+      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);
 
   // Only the first directory should be used.
-  EXPECT_EQ(1, tmp_file_mgr.num_active_tmp_devices());
-  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.active_tmp_devices();
+  EXPECT_EQ(1, tmp_file_mgr.NumActiveTmpDevices());
+  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.ActiveTmpDevices();
   EXPECT_EQ(1, devices.size());
-  TUniqueId id;
-  TmpFileMgr::File* file;
-  ASSERT_OK(NewFile(&file_group, devices[0], id, &file));
+  vector<TmpFileMgr::File*> files;
+  ASSERT_OK(CreateFiles(&file_group, &files));
+  EXPECT_EQ(1, files.size());
+  TmpFileMgr::File* file = files[0];
   // Check the prefix is the expected temporary directory.
   EXPECT_EQ(0, file->path().find(tmp_dirs[0]));
   FileSystemUtil::RemovePaths(tmp_dirs);
@@ -151,19 +227,22 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) {
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
-  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_);
+  TUniqueId id;
+  TmpFileMgr::FileGroup file_group(
+      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);
 
   // Both directories should be used.
-  EXPECT_EQ(2, tmp_file_mgr.num_active_tmp_devices());
-  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.active_tmp_devices();
+  EXPECT_EQ(2, tmp_file_mgr.NumActiveTmpDevices());
+  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.ActiveTmpDevices();
   EXPECT_EQ(2, devices.size());
-  for (int i = 0; i < tmp_dirs.size(); ++i) {
+
+  vector<TmpFileMgr::File*> files;
+  ASSERT_OK(CreateFiles(&file_group, &files));
+  EXPECT_EQ(2, files.size());
+  for (int i = 0; i < 2; ++i) {
     EXPECT_EQ(0, tmp_file_mgr.GetTmpDirPath(devices[i]).find(tmp_dirs[i]));
-    TUniqueId id;
-    TmpFileMgr::File* file;
-    ASSERT_OK(NewFile(&file_group, devices[i], id, &file));
     // Check the prefix is the expected temporary directory.
-    EXPECT_EQ(0, file->path().find(tmp_dirs[i]));
+    EXPECT_EQ(0, files[i]->path().find(tmp_dirs[i]));
   }
   FileSystemUtil::RemovePaths(tmp_dirs);
   file_group.Close();
@@ -171,79 +250,79 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) {
 }
 
 /// Test that reporting a write error is possible but does not result in
-/// blacklisting, which is disabled.
+/// blacklisting the device.
 TEST_F(TmpFileMgrTest, TestReportError) {
   vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", 
"/tmp/tmp-file-mgr-test.2"});
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
-  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_);
+  TUniqueId id;
+  TmpFileMgr::FileGroup file_group(
+      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);
 
   // Both directories should be used.
-  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.active_tmp_devices();
+  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.ActiveTmpDevices();
   EXPECT_EQ(2, devices.size());
   CheckMetrics(&tmp_file_mgr);
 
   // Inject an error on one device so that we can validate it is handled 
correctly.
-  TUniqueId id;
   int good_device = 0, bad_device = 1;
-  TmpFileMgr::File* bad_file;
-  ASSERT_OK(NewFile(&file_group, devices[bad_device], id, &bad_file));
+  vector<TmpFileMgr::File*> files;
+  ASSERT_OK(CreateFiles(&file_group, &files));
+  ASSERT_EQ(2, files.size());
+  TmpFileMgr::File* good_file = files[good_device];
+  TmpFileMgr::File* bad_file = files[bad_device];
   ErrorMsg errmsg(TErrorCode::GENERAL, "A fake error");
-  bad_file->ReportIOError(errmsg);
+  bad_file->Blacklist(errmsg);
 
-  // Blacklisting is disabled.
-  EXPECT_FALSE(bad_file->is_blacklisted());
-  // The second device should still be active.
-  EXPECT_EQ(2, tmp_file_mgr.num_active_tmp_devices());
-  vector<TmpFileMgr::DeviceId> devices_after = 
tmp_file_mgr.active_tmp_devices();
+  // File-level blacklisting is enabled but not device-level.
+  EXPECT_TRUE(bad_file->is_blacklisted());
+  // The bad device should still be active.
+  EXPECT_EQ(2, tmp_file_mgr.NumActiveTmpDevices());
+  vector<TmpFileMgr::DeviceId> devices_after = tmp_file_mgr.ActiveTmpDevices();
   EXPECT_EQ(2, devices_after.size());
   CheckMetrics(&tmp_file_mgr);
 
   // Attempts to expand bad file should succeed.
   int64_t offset;
-  ASSERT_OK(AllocateSpace(bad_file, 128, &offset));
+  ASSERT_OK(FileAllocateSpace(bad_file, 128, &offset));
   // The good device should still be usable.
-  TmpFileMgr::File* good_file;
-  ASSERT_OK(NewFile(&file_group, devices[good_device], id, &good_file));
-  EXPECT_TRUE(good_file != NULL);
-  ASSERT_OK(AllocateSpace(good_file, 128, &offset));
+  ASSERT_OK(FileAllocateSpace(good_file, 128, &offset));
   // Attempts to allocate new files on bad device should succeed.
-  ASSERT_OK(NewFile(&file_group, devices[bad_device], id, &bad_file));
+  unique_ptr<TmpFileMgr::File> bad_file2;
+  ASSERT_OK(NewFile(&tmp_file_mgr, &file_group, bad_device, &bad_file2));
   FileSystemUtil::RemovePaths(tmp_dirs);
   file_group.Close();
   CheckMetrics(&tmp_file_mgr);
 }
 
-TEST_F(TmpFileMgrTest, TestAllocateFails) {
-  string tmp_dir("/tmp/tmp-file-mgr-test.1");
-  string scratch_subdir = tmp_dir + "/impala-scratch";
-  vector<string> tmp_dirs({tmp_dir});
+TEST_F(TmpFileMgrTest, TestAllocateNonWritable) {
+  vector<string> tmp_dirs;
+  vector<string> scratch_subdirs;
+  for (int i = 0; i < 2; ++i) {
+    tmp_dirs.push_back(Substitute("/tmp/tmp-file-mgr-test.$0", i));
+    scratch_subdirs.push_back(tmp_dirs[i] + "/impala-scratch");
+  }
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
-  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_);
-
   TUniqueId id;
-  TmpFileMgr::File* allocated_file1;
-  TmpFileMgr::File* allocated_file2;
+  TmpFileMgr::FileGroup file_group(
+      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);
+
+  vector<TmpFileMgr::File*> allocated_files;
+  ASSERT_OK(CreateFiles(&file_group, &allocated_files))
   int64_t offset;
-  ASSERT_OK(NewFile(&file_group, 0, id, &allocated_file1));
-  ASSERT_OK(NewFile(&file_group, 0, id, &allocated_file2));
-  ASSERT_OK(AllocateSpace(allocated_file1, 1, &offset));
+  ASSERT_OK(FileAllocateSpace(allocated_files[0], 1, &offset));
 
-  // Make scratch non-writable and test for allocation errors at different 
stages:
+  // Make scratch non-writable and test allocation at different stages:
   // new file creation, files with no allocated blocks. files with allocated 
space.
-  chmod(scratch_subdir.c_str(), 0);
-  // allocated_file1 already has space allocated.
-  EXPECT_FALSE(AllocateSpace(allocated_file1, 1, &offset).ok());
-  // allocated_file2 has no space allocated.
-  EXPECT_FALSE(AllocateSpace(allocated_file2, 1, &offset).ok());
-  // Creating a new File object can succeed because it is not immediately 
created on disk.
-  TmpFileMgr::File* unallocated_file;
-  ASSERT_OK(NewFile(&file_group, 0, id, &unallocated_file));
-
-  chmod(scratch_subdir.c_str(), S_IRWXU);
+  // No errors should be encountered during allocation since allocation is 
purely logical.
+  chmod(scratch_subdirs[0].c_str(), 0);
+  ASSERT_OK(FileAllocateSpace(allocated_files[0], 1, &offset));
+  ASSERT_OK(FileAllocateSpace(allocated_files[1], 1, &offset));
+
+  chmod(scratch_subdirs[0].c_str(), S_IRWXU);
   FileSystemUtil::RemovePaths(tmp_dirs);
   file_group.Close();
 }
@@ -256,51 +335,86 @@ TEST_F(TmpFileMgrTest, TestScratchLimit) {
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
 
   const int64_t LIMIT = 100;
-  const int64_t FILE1_ALLOC = 25;
-  const int64_t FILE2_ALLOC = LIMIT - FILE1_ALLOC;
-  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_, LIMIT);
-  TmpFileMgr::File* file1;
-  TmpFileMgr::File* file2;
+  const int64_t ALLOC_SIZE = 50;
   TUniqueId id;
-  ASSERT_OK(NewFile(&file_group, 0, id, &file1));
-  ASSERT_OK(NewFile(&file_group, 1, id, &file2));
+  TmpFileMgr::FileGroup file_group(
+      &tmp_file_mgr, io_mgr(), profile_, id, ALLOC_SIZE, LIMIT);
+
+  vector<TmpFileMgr::File*> files;
+  ASSERT_OK(CreateFiles(&file_group, &files));
 
   // Test individual limit is enforced.
   Status status;
   int64_t offset;
   TmpFileMgr::File* alloc_file;
-  // Alloc from both files should fail.
-  for (int i = 0; i <= 1; ++i) {
-    status = file_group.AllocateSpace(LIMIT + 1, &alloc_file, &offset);
-    ASSERT_FALSE(status.ok());
-    ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
-  }
 
   // Alloc from file 1 should succeed.
-  ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
-  ASSERT_OK(file_group.AllocateSpace(FILE1_ALLOC, &alloc_file, &offset));
-  ASSERT_EQ(alloc_file, file1); // Should select files round-robin.
+  SetNextAllocationIndex(&file_group, 0);
+  ASSERT_OK(GroupAllocateSpace(&file_group, ALLOC_SIZE, &alloc_file, &offset));
+  ASSERT_EQ(alloc_file, files[0]); // Should select files round-robin.
   ASSERT_EQ(0, offset);
 
-  // Test aggregate limit is enforced on both files.
-  for (int i = 0; i <= 1; ++i) {
-    status = file_group.AllocateSpace(FILE2_ALLOC + 1, &alloc_file, &offset);
-    ASSERT_FALSE(status.ok());
-    ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
-  }
-
   // Allocate up to the max.
-  ASSERT_OK(file_group.AllocateSpace(FILE2_ALLOC, &alloc_file, &offset));
+  ASSERT_OK(GroupAllocateSpace(&file_group, ALLOC_SIZE, &alloc_file, &offset));
   ASSERT_EQ(0, offset);
-  ASSERT_EQ(alloc_file, file2);
+  ASSERT_EQ(alloc_file, files[1]);
 
-  // Test aggregate limit still enforced.
-  status = file_group.AllocateSpace(1, &alloc_file, &offset);
+  // Test aggregate limit is enforced.
+  status = GroupAllocateSpace(&file_group, 1, &alloc_file, &offset);
   ASSERT_FALSE(status.ok());
   ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
 
   file_group.Close();
 }
+
+// Test that scratch file ranges are recycled as expected.
+TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) {
+  const int64_t ALLOC_SIZE = 50;
+  TUniqueId id;
+  TmpFileMgr::FileGroup file_group(
+      test_env_->tmp_file_mgr(), io_mgr(), profile_, id, ALLOC_SIZE);
+
+  // Generate some data.
+  const int BLOCKS = 5;
+  vector<vector<uint8_t>> data(BLOCKS);
+  for (int i = 0; i < BLOCKS; ++i) {
+    data[i].resize(ALLOC_SIZE);
+    std::iota(data[i].begin(), data[i].end(), i);
+  }
+
+  DiskIoMgr::WriteRange::WriteDoneCallback callback =
+      bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
+  vector<unique_ptr<TmpFileMgr::WriteHandle>> handles(BLOCKS);
+  // Make sure free space doesn't grow over several iterations.
+  const int TEST_ITERS = 5;
+  for (int i = 0; i < TEST_ITERS; ++i) {
+    cb_counter_ = 0;
+    for (int j = 0; j < BLOCKS; ++j) {
+      ASSERT_OK(
+          file_group.Write(MemRange(data[j].data(), ALLOC_SIZE), callback, 
&handles[j]));
+    }
+    WaitForCallbacks(BLOCKS);
+    EXPECT_EQ(ALLOC_SIZE * BLOCKS, BytesAllocated(&file_group));
+
+    // Read back and validate.
+    for (int j = 0; j < BLOCKS; ++j) {
+      uint8_t tmp[ALLOC_SIZE];
+      ASSERT_OK(file_group.Read(handles[j].get(), MemRange(tmp, ALLOC_SIZE)));
+      EXPECT_EQ(0, memcmp(tmp, data[j].data(), ALLOC_SIZE));
+      file_group.DestroyWriteHandle(move(handles[j]));
+    }
+    // Check that the space is still in use - it should be recycled by the 
next iteration.
+    EXPECT_EQ(ALLOC_SIZE * BLOCKS, BytesAllocated(&file_group));
+  }
+
+  file_group.Close();
+  test_env_->TearDownRuntimeStates();
+}
 }
 
-IMPALA_TEST_MAIN();
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+  return RUN_ALL_TESTS();
+}

Reply via email to