http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc 
b/be/src/runtime/buffered-block-mgr-test.cc
deleted file mode 100644
index cb294c2..0000000
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ /dev/null
@@ -1,1547 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <boost/bind.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/filesystem.hpp>
-#include <boost/regex.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/thread/thread.hpp>
-#include <gutil/strings/substitute.h>
-#include <sys/stat.h>
-
-#include "codegen/llvm-codegen.h"
-#include "common/init.h"
-#include "common/object-pool.h"
-#include "runtime/buffered-block-mgr.h"
-#include "runtime/disk-io-mgr.h"
-#include "runtime/exec-env.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/query-state.h"
-#include "runtime/runtime-state.h"
-#include "runtime/test-env.h"
-#include "runtime/tmp-file-mgr.h"
-#include "service/fe-support.h"
-#include "testutil/gtest-util.h"
-#include "util/cpu-info.h"
-#include "util/disk-info.h"
-#include "util/error-util.h"
-#include "util/filesystem-util.h"
-#include "util/promise.h"
-#include "util/test-info.h"
-#include "util/time.h"
-
-#include "gen-cpp/Types_types.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
-
-#include "common/names.h"
-
-using boost::filesystem::directory_iterator;
-using boost::filesystem::remove;
-using boost::regex;
-
-// Note: This is the default scratch dir created by impala.
-// FLAGS_scratch_dirs + TmpFileMgr::TMP_SUB_DIR_NAME.
-const string SCRATCH_DIR = "/tmp/impala-scratch";
-
-// This suffix is appended to a tmp dir
-const string SCRATCH_SUFFIX = "/impala-scratch";
-
-// Number of millieconds to wait to ensure write completes. We don't know for 
sure how
-// slow the disk will be, so this is much higher than we expect the writes to 
take.
-const static int WRITE_WAIT_MILLIS = 10000;
-
-// How often to check for write completion
-const static int WRITE_CHECK_INTERVAL_MILLIS = 10;
-
-DECLARE_bool(disk_spill_encryption);
-
-namespace impala {
-
-class BufferedBlockMgrTest : public ::testing::Test {
- protected:
-  const static int block_size_ = 1024;
-
-  virtual void SetUp() {
-    test_env_.reset(new TestEnv());
-    ASSERT_OK(test_env_->Init());
-  }
-
-  virtual void TearDown() {
-    TearDownMgrs();
-    test_env_.reset();
-
-    // Tests modify permissions, so make sure we can delete if they didn't 
clean up.
-    for (int i = 0; i < created_tmp_dirs_.size(); ++i) {
-      chmod((created_tmp_dirs_[i] + SCRATCH_SUFFIX).c_str(), S_IRWXU);
-    }
-    FileSystemUtil::RemovePaths(created_tmp_dirs_);
-    created_tmp_dirs_.clear();
-    pool_.Clear();
-  }
-
-  /// Reinitialize test_env_ to have multiple temporary directories.
-  vector<string> InitMultipleTmpDirs(int num_dirs) {
-    vector<string> tmp_dirs;
-    for (int i = 0; i < num_dirs; ++i) {
-      const string& dir = Substitute("/tmp/buffered-block-mgr-test.$0", i);
-      // Fix permissions in case old directories were left from previous runs 
of test.
-      chmod((dir + SCRATCH_SUFFIX).c_str(), S_IRWXU);
-      EXPECT_OK(FileSystemUtil::RemoveAndCreateDirectory(dir));
-      tmp_dirs.push_back(dir);
-      created_tmp_dirs_.push_back(dir);
-    }
-    test_env_.reset(new TestEnv);
-    test_env_->SetTmpFileMgrArgs(tmp_dirs, false);
-    EXPECT_OK(test_env_->Init());
-    EXPECT_EQ(num_dirs, test_env_->tmp_file_mgr()->NumActiveTmpDevices());
-    return tmp_dirs;
-  }
-
-  static void ValidateBlock(BufferedBlockMgr::Block* block, int32_t data) {
-    ASSERT_EQ(block->valid_data_len(), sizeof(int32_t));
-    ASSERT_EQ(*reinterpret_cast<int32_t*>(block->buffer()), data);
-  }
-
-  static int32_t* MakeRandomSizeData(BufferedBlockMgr::Block* block) {
-    // Format is int32_t size, followed by size bytes of data
-    int32_t size = (rand() % 252) + 4; // So blocks have 4-256 bytes of data
-    uint8_t* data = block->Allocate<uint8_t>(size);
-    *(reinterpret_cast<int32_t*>(data)) = size;
-    int i;
-    for (i = 4; i < size-5; ++i) {
-      data[i] = i;
-    }
-    for (; i < size; ++i) {  // End marker of at least 5 0xff's
-      data[i] = 0xff;
-    }
-    return reinterpret_cast<int32_t*>(data);  // Really returns a pointer to 
size
-  }
-
-  static void ValidateRandomSizeData(BufferedBlockMgr::Block* block, int32_t 
size) {
-    int32_t bsize = *(reinterpret_cast<int32_t*>(block->buffer()));
-    uint8_t* data = reinterpret_cast<uint8_t*>(block->buffer());
-    int i;
-    ASSERT_EQ(block->valid_data_len(), size);
-    ASSERT_EQ(size, bsize);
-    for (i = 4; i < size - 5; ++i) {
-      ASSERT_EQ(data[i], i);
-    }
-    for (; i < size; ++i) {
-      ASSERT_EQ(data[i], 0xff);
-    }
-  }
-
-  /// Helper to create a simple block manager.
-  BufferedBlockMgr* CreateMgr(int64_t query_id, int max_buffers, int 
block_size,
-      RuntimeState** query_state = NULL, TQueryOptions* query_options = NULL) {
-    RuntimeState* state;
-    EXPECT_OK(test_env_->CreateQueryStateWithBlockMgr(
-        query_id, max_buffers, block_size, query_options, &state));
-    if (query_state != NULL) *query_state = state;
-    return state->block_mgr();
-  }
-
-  /// Create a new client tracker as a child of the RuntimeState's instance 
tracker.
-  MemTracker* NewClientTracker(RuntimeState* state) {
-    return pool_.Add(new MemTracker(-1, "client", 
state->instance_mem_tracker()));
-  }
-
-  BufferedBlockMgr* CreateMgrAndClient(int64_t query_id, int max_buffers, int 
block_size,
-      int reserved_blocks, bool tolerates_oversubscription,
-      BufferedBlockMgr::Client** client, RuntimeState** query_state = NULL,
-      TQueryOptions* query_options = NULL) {
-    RuntimeState* state;
-    BufferedBlockMgr* mgr =
-        CreateMgr(query_id, max_buffers, block_size, &state, query_options);
-
-    MemTracker* client_tracker = NewClientTracker(state);
-    EXPECT_OK(mgr->RegisterClient(Substitute("Client for query $0", query_id),
-        reserved_blocks, tolerates_oversubscription, client_tracker, state, 
client));
-    EXPECT_TRUE(client != NULL);
-    if (query_state != NULL) *query_state = state;
-    return mgr;
-  }
-
-  void CreateMgrsAndClients(int64_t start_query_id, int num_mgrs, int 
buffers_per_mgr,
-      int block_size, int reserved_blocks_per_client, bool 
tolerates_oversubscription,
-      vector<BufferedBlockMgr*>* mgrs, vector<BufferedBlockMgr::Client*>* 
clients) {
-    for (int i = 0; i < num_mgrs; ++i) {
-      BufferedBlockMgr::Client* client;
-      BufferedBlockMgr* mgr = CreateMgrAndClient(start_query_id + i, 
buffers_per_mgr,
-          block_size_, reserved_blocks_per_client, tolerates_oversubscription, 
&client);
-      mgrs->push_back(mgr);
-      clients->push_back(client);
-    }
-  }
-
-  // Destroy all created query states and associated block managers.
-  void TearDownMgrs() {
-    // Tear down the query states, which DCHECKs that the memory consumption of
-    // the query's trackers is zero.
-    test_env_->TearDownQueries();
-  }
-
-  void AllocateBlocks(BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* 
client,
-      int num_blocks, vector<BufferedBlockMgr::Block*>* blocks) {
-    int32_t* data;
-    Status status;
-    BufferedBlockMgr::Block* new_block;
-    for (int i = 0; i < num_blocks; ++i) {
-      ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block));
-      ASSERT_TRUE(new_block != NULL);
-      data = new_block->Allocate<int32_t>(sizeof(int32_t));
-      *data = blocks->size();
-      blocks->push_back(new_block);
-    }
-  }
-
-  // Pin all blocks, expecting they are pinned successfully.
-  void PinBlocks(const vector<BufferedBlockMgr::Block*>& blocks) {
-    for (int i = 0; i < blocks.size(); ++i) {
-      bool pinned;
-      ASSERT_OK(blocks[i]->Pin(&pinned));
-      ASSERT_TRUE(pinned);
-    }
-  }
-
-  // Pin all blocks. By default, expect no errors from Unpin() calls. If
-  // expected_error_codes is non-NULL, returning one of the error codes is
-  // also allowed.
-  void UnpinBlocks(const vector<BufferedBlockMgr::Block*>& blocks,
-      const vector<TErrorCode::type>* expected_error_codes = nullptr,
-      int delay_between_unpins_ms = 0) {
-    for (int i = 0; i < blocks.size(); ++i) {
-      Status status = blocks[i]->Unpin();
-      if (!status.ok() && expected_error_codes != nullptr) {
-        // Check if it's one of the expected errors.
-        bool is_expected_error = false;
-        for (TErrorCode::type code : *expected_error_codes) {
-          if (status.code() == code) {
-            is_expected_error = true;
-            break;
-          }
-        }
-        ASSERT_TRUE(is_expected_error) << status.msg().msg();
-      } else {
-        ASSERT_TRUE(status.ok()) << status.msg().msg();
-      }
-      if (delay_between_unpins_ms > 0) SleepForMs(delay_between_unpins_ms);
-    }
-  }
-
-  void DeleteBlocks(const vector<BufferedBlockMgr::Block*>& blocks) {
-    for (int i = 0; i < blocks.size(); ++i) {
-      blocks[i]->Delete();
-    }
-  }
-
-  void DeleteBlocks(const vector<pair<BufferedBlockMgr::Block*, int32_t>>& 
blocks) {
-    for (int i = 0; i < blocks.size(); ++i) {
-      blocks[i].first->Delete();
-    }
-  }
-
-  static void WaitForWrites(BufferedBlockMgr* block_mgr) {
-    vector<BufferedBlockMgr*> block_mgrs;
-    block_mgrs.push_back(block_mgr);
-    WaitForWrites(block_mgrs);
-  }
-
-  // Wait for writes issued through block managers to complete.
-  static void WaitForWrites(const vector<BufferedBlockMgr*>& block_mgrs) {
-    int max_attempts = WRITE_WAIT_MILLIS / WRITE_CHECK_INTERVAL_MILLIS;
-    for (int i = 0; i < max_attempts; ++i) {
-      SleepForMs(WRITE_CHECK_INTERVAL_MILLIS);
-      if (AllWritesComplete(block_mgrs)) return;
-    }
-    ASSERT_TRUE(false) << "Writes did not complete after " << 
WRITE_WAIT_MILLIS << "ms";
-  }
-
-  static bool AllWritesComplete(BufferedBlockMgr* block_mgr) {
-    return block_mgr->GetNumWritesOutstanding() == 0;
-  }
-
-  static bool AllWritesComplete(const vector<BufferedBlockMgr*>& block_mgrs) {
-    for (int i = 0; i < block_mgrs.size(); ++i) {
-      if (!AllWritesComplete(block_mgrs[i])) return false;
-    }
-    return true;
-  }
-
-  // Remove permissions for the temporary file at 'path' - all subsequent 
writes
-  // to the file should fail. Expects backing file has already been allocated.
-  static void DisableBackingFile(const string& path) {
-    EXPECT_GT(path.size(), 0);
-    EXPECT_EQ(0, chmod(path.c_str(), 0));
-    LOG(INFO) << "Injected fault by removing file permissions " << path;
-  }
-
-  // Check that the file backing the block has dir as a prefix of its path.
-  static bool BlockInDir(BufferedBlockMgr::Block* block, const string& dir) {
-    return block->TmpFilePath().find(dir) == 0;
-  }
-
-  // Find a block in the list that is backed by a file with the given 
directory as prefix
-  // of its path.
-  static BufferedBlockMgr::Block* FindBlockForDir(
-      const vector<BufferedBlockMgr::Block*>& blocks, const string& dir) {
-    for (int i = 0; i < blocks.size(); ++i) {
-      if (BlockInDir(blocks[i], dir)) return blocks[i];
-    }
-    return NULL;
-  }
-
-  void TestGetNewBlockImpl(int block_size) {
-    Status status;
-    int max_num_blocks = 5;
-    vector<BufferedBlockMgr::Block*> blocks;
-    BufferedBlockMgr* block_mgr;
-    BufferedBlockMgr::Client* client;
-    block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false, 
&client);
-    ASSERT_EQ(test_env_->TotalQueryMemoryConsumption(), 0);
-
-    // Allocate blocks until max_num_blocks, they should all succeed and memory
-    // usage should go up.
-    BufferedBlockMgr::Block* new_block;
-    BufferedBlockMgr::Block* first_block = NULL;
-    for (int i = 0; i < max_num_blocks; ++i) {
-      status = block_mgr->GetNewBlock(client, NULL, &new_block);
-      ASSERT_TRUE(new_block != NULL);
-      ASSERT_EQ(block_mgr->bytes_allocated(), (i + 1) * block_size);
-      if (first_block == NULL) first_block = new_block;
-      blocks.push_back(new_block);
-    }
-
-    // Trying to allocate a new one should fail.
-    ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block));
-    ASSERT_TRUE(new_block == NULL);
-    ASSERT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size);
-
-    // We can allocate a new block by transferring an already allocated one.
-    uint8_t* old_buffer = first_block->buffer();
-    ASSERT_OK(block_mgr->GetNewBlock(client, first_block, &new_block));
-    ASSERT_TRUE(new_block != NULL);
-    ASSERT_EQ(old_buffer, new_block->buffer());
-    ASSERT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size);
-    ASSERT_TRUE(!first_block->is_pinned());
-    blocks.push_back(new_block);
-
-    // Trying to allocate a new one should still fail.
-    ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block));
-    ASSERT_TRUE(new_block == NULL);
-    ASSERT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size);
-
-    ASSERT_EQ(block_mgr->writes_issued(), 1);
-
-    DeleteBlocks(blocks);
-    TearDownMgrs();
-  }
-
-  void TestEvictionImpl(int block_size) {
-    ASSERT_GT(block_size, 0);
-    int max_num_buffers = 5;
-    BufferedBlockMgr* block_mgr;
-    BufferedBlockMgr::Client* client;
-    block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false, 
&client);
-
-    // Check counters.
-    RuntimeProfile* profile = block_mgr->profile();
-    RuntimeProfile::Counter* buffered_pin = 
profile->GetCounter("BufferedPins");
-
-    vector<BufferedBlockMgr::Block*> blocks;
-    AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-
-    ASSERT_EQ(block_mgr->bytes_allocated(), max_num_buffers * block_size);
-    for (BufferedBlockMgr::Block* block : blocks) block->Unpin();
-
-    // Re-pinning all blocks
-    for (int i = 0; i < blocks.size(); ++i) {
-      bool pinned;
-      ASSERT_OK(blocks[i]->Pin(&pinned));
-      ASSERT_TRUE(pinned);
-      ValidateBlock(blocks[i], i);
-    }
-    int buffered_pins_expected = blocks.size();
-    ASSERT_EQ(buffered_pin->value(), buffered_pins_expected);
-
-    // Unpin all blocks
-    for (BufferedBlockMgr::Block* block : blocks) block->Unpin();
-    // Get two new blocks.
-    AllocateBlocks(block_mgr, client, 2, &blocks);
-    // At least two writes must be issued. The first (num_blocks - 2) must be 
in memory.
-    ASSERT_GE(block_mgr->writes_issued(), 2);
-    for (int i = 0; i < (max_num_buffers - 2); ++i) {
-      bool pinned;
-      ASSERT_OK(blocks[i]->Pin(&pinned));
-      ASSERT_TRUE(pinned);
-      ValidateBlock(blocks[i], i);
-    }
-    ASSERT_GE(buffered_pin->value(), buffered_pins_expected);
-    DeleteBlocks(blocks);
-    TearDownMgrs();
-  }
-
-  // Test that randomly issues GetFreeBlock(), Pin(), Unpin(), Delete() and 
Close()
-  // calls. All calls made are legal - error conditions are not expected until 
the first
-  // call to Close(). This is called 2 times with encryption+integrity on/off.
-  // When executed in single-threaded mode 'tid' should be SINGLE_THREADED_TID.
-  static const int SINGLE_THREADED_TID = -1;
-  void TestRandomInternalImpl(RuntimeState* state, BufferedBlockMgr* block_mgr,
-      int num_buffers, int tid) {
-    ASSERT_TRUE(block_mgr != NULL);
-    const int num_iterations = 10000;
-    const int iters_before_close = num_iterations - 1000;
-    bool close_called = false;
-    unordered_map<BufferedBlockMgr::Block*, int> pinned_block_map;
-    vector<pair<BufferedBlockMgr::Block*, int32_t>> pinned_blocks;
-    unordered_map<BufferedBlockMgr::Block*, int> unpinned_block_map;
-    vector<pair<BufferedBlockMgr::Block*, int32_t>> unpinned_blocks;
-
-    typedef enum { Pin, New, Unpin, Delete, Close } ApiFunction;
-    ApiFunction api_function;
-
-    BufferedBlockMgr::Client* client;
-    ASSERT_OK(
-        block_mgr->RegisterClient("", 0, false, NewClientTracker(state), 
state, &client));
-    ASSERT_TRUE(client != NULL);
-
-    pinned_blocks.reserve(num_buffers);
-    BufferedBlockMgr::Block* new_block;
-    for (int i = 0; i < num_iterations; ++i) {
-      if ((i % 20000) == 0) LOG (ERROR) << " Iteration " << i << endl;
-      if (i > iters_before_close && (rand() % 5 == 0)) {
-        api_function = Close;
-      } else if (pinned_blocks.size() == 0 && unpinned_blocks.size() == 0) {
-        api_function = New;
-      } else if (pinned_blocks.size() == 0) {
-        // Pin or New. Can't unpin or delete.
-        api_function = static_cast<ApiFunction>(rand() % 2);
-      } else if (pinned_blocks.size() >= num_buffers) {
-        // Unpin or delete. Can't pin or get new.
-        api_function = static_cast<ApiFunction>(2 + (rand() % 2));
-      } else if (unpinned_blocks.size() == 0) {
-        // Can't pin. Unpin, new or delete.
-        api_function = static_cast<ApiFunction>(1 + (rand() % 3));
-      } else {
-        // Any api function.
-        api_function = static_cast<ApiFunction>(rand() % 4);
-      }
-
-      pair<BufferedBlockMgr::Block*, int32_t> block_data;
-      int rand_pick = 0;
-      int32_t* data = NULL;
-      bool pinned = false;
-      Status status;
-      switch (api_function) {
-        case New:
-          status = block_mgr->GetNewBlock(client, NULL, &new_block);
-          if (close_called || (tid != SINGLE_THREADED_TID && 
status.IsCancelled())) {
-            ASSERT_TRUE(new_block == NULL);
-            ASSERT_TRUE(status.IsCancelled());
-            continue;
-          }
-          ASSERT_OK(status);
-          ASSERT_TRUE(new_block != NULL);
-          data = MakeRandomSizeData(new_block);
-          block_data = make_pair(new_block, *data);
-
-          pinned_blocks.push_back(block_data);
-          pinned_block_map.insert(make_pair(block_data.first, 
pinned_blocks.size() - 1));
-          break;
-        case Pin:
-          rand_pick = rand() % unpinned_blocks.size();
-          block_data = unpinned_blocks[rand_pick];
-          status = block_data.first->Pin(&pinned);
-          if (close_called || (tid != SINGLE_THREADED_TID && 
status.IsCancelled())) {
-            ASSERT_TRUE(status.IsCancelled());
-            // In single-threaded runs the block should not have been pinned.
-            // In multi-threaded runs Pin() may return the block pinned but 
the status to
-            // be cancelled. In this case we could move the block from 
unpinned_blocks
-            // to pinned_blocks. We do not do that because after IsCancelled() 
no actual
-            // block operations should take place.
-            if (tid == SINGLE_THREADED_TID) ASSERT_FALSE(pinned);
-            continue;
-          }
-          ASSERT_OK(status);
-          ASSERT_TRUE(pinned);
-          ValidateRandomSizeData(block_data.first, block_data.second);
-          unpinned_blocks[rand_pick] = unpinned_blocks.back();
-          unpinned_blocks.pop_back();
-          unpinned_block_map[unpinned_blocks[rand_pick].first] = rand_pick;
-
-          pinned_blocks.push_back(block_data);
-          pinned_block_map.insert(make_pair(block_data.first, 
pinned_blocks.size() - 1));
-          break;
-        case Unpin:
-          rand_pick = rand() % pinned_blocks.size();
-          block_data = pinned_blocks[rand_pick];
-          status = block_data.first->Unpin();
-          if (close_called || (tid != SINGLE_THREADED_TID && 
status.IsCancelled())) {
-            ASSERT_TRUE(status.IsCancelled());
-            continue;
-          }
-          ASSERT_OK(status);
-          pinned_blocks[rand_pick] = pinned_blocks.back();
-          pinned_blocks.pop_back();
-          pinned_block_map[pinned_blocks[rand_pick].first] = rand_pick;
-
-          unpinned_blocks.push_back(block_data);
-          unpinned_block_map.insert(make_pair(block_data.first,
-              unpinned_blocks.size() - 1));
-          break;
-        case Delete:
-          rand_pick = rand() % pinned_blocks.size();
-          block_data = pinned_blocks[rand_pick];
-          block_data.first->Delete();
-          pinned_blocks[rand_pick] = pinned_blocks.back();
-          pinned_blocks.pop_back();
-          pinned_block_map[pinned_blocks[rand_pick].first] = rand_pick;
-          break;
-        case Close:
-          block_mgr->Cancel();
-          close_called = true;
-          break;
-      }
-    }
-
-    // The client needs to delete all its blocks.
-    DeleteBlocks(pinned_blocks);
-    DeleteBlocks(unpinned_blocks);
-  }
-
-  // Single-threaded execution of the TestRandomInternalImpl.
-  void TestRandomInternalSingle(int block_size) {
-    ASSERT_GT(block_size, 0);
-    ASSERT_TRUE(test_env_.get() != NULL);
-    const int max_num_buffers = 100;
-    RuntimeState* state;
-    BufferedBlockMgr* block_mgr = CreateMgr(0, max_num_buffers, block_size, 
&state);
-    TestRandomInternalImpl(state, block_mgr, max_num_buffers, 
SINGLE_THREADED_TID);
-    TearDownMgrs();
-  }
-
-  // Multi-threaded execution of the TestRandomInternalImpl.
-  void TestRandomInternalMulti(int num_threads, int block_size) {
-    ASSERT_GT(num_threads, 0);
-    ASSERT_GT(block_size, 0);
-    ASSERT_TRUE(test_env_.get() != NULL);
-    const int max_num_buffers = 100;
-    RuntimeState* state;
-    BufferedBlockMgr* block_mgr = CreateMgr(0, num_threads * max_num_buffers, 
block_size,
-        &state);
-
-    thread_group workers;
-    for (int i = 0; i < num_threads; ++i) {
-      thread* t = new 
thread(bind(&BufferedBlockMgrTest::TestRandomInternalImpl, this,
-                                  state, block_mgr, max_num_buffers, i));
-      workers.add_thread(t);
-    }
-    workers.join_all();
-    TearDownMgrs();
-  }
-
-  // Repeatedly call BufferedBlockMgr::Create() and 
BufferedBlockMgr::~BufferedBlockMgr().
-  void CreateDestroyThread(RuntimeState* state) {
-    const int num_buffers = 10;
-    const int iters = 10000;
-    for (int i = 0; i < iters; ++i) {
-      shared_ptr<BufferedBlockMgr> mgr;
-      Status status = BufferedBlockMgr::Create(state, 
state->query_mem_tracker(),
-          state->runtime_profile(), test_env_->tmp_file_mgr(), block_size_ * 
num_buffers,
-          block_size_, &mgr);
-    }
-  }
-
-  // IMPALA-2286: Test for races between BufferedBlockMgr::Create() and
-  // BufferedBlockMgr::~BufferedBlockMgr().
-  void CreateDestroyMulti() {
-    const int num_threads = 8;
-    thread_group workers;
-    // Create a shared RuntimeState with no BufferedBlockMgr.
-    RuntimeState shared_state(TQueryCtx(), test_env_->exec_env());
-
-    for (int i = 0; i < num_threads; ++i) {
-      thread* t = new thread(
-          bind(&BufferedBlockMgrTest::CreateDestroyThread, this, 
&shared_state));
-      workers.add_thread(t);
-    }
-    workers.join_all();
-    shared_state.ReleaseResources();
-  }
-
-  // Test that in-flight IO operations are correctly handled on tear down.
-  // write: if true, tear down while write operations are in flight, otherwise 
tear down
-  //    during read operations.
-  void TestDestructDuringIO(bool write);
-
-  /// Test for IMPALA-2252: race when tearing down runtime state and block mgr 
after query
-  /// cancellation. Simulates query cancellation while writes are in flight. 
Forces the
-  /// block mgr to have a longer lifetime than the runtime state. If 
write_error is true,
-  /// force writes to hit errors. If wait_for_writes is true, wait for writes 
to complete
-  /// before destroying block mgr.
-  void TestRuntimeStateTeardown(bool write_error, bool wait_for_writes);
-
-  void TestWriteError(int write_delay_ms);
-
-  scoped_ptr<TestEnv> test_env_;
-  ObjectPool pool_;
-  vector<string> created_tmp_dirs_;
-};
-
-TEST_F(BufferedBlockMgrTest, GetNewBlock) {
-  TestGetNewBlockImpl(1024);
-  TestGetNewBlockImpl(8 * 1024);
-  TestGetNewBlockImpl(8 * 1024 * 1024);
-}
-
-TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) {
-  const int block_size = 1024;
-  int max_num_blocks = 3;
-  BufferedBlockMgr* block_mgr;
-  BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false, 
&client);
-  MemTracker* client_tracker = block_mgr->get_tracker(client);
-  ASSERT_EQ(0, test_env_->TotalQueryMemoryConsumption());
-
-  vector<BufferedBlockMgr::Block*> blocks;
-
-  // Allocate a small block.
-  BufferedBlockMgr::Block* new_block = NULL;
-  ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, 128));
-  ASSERT_TRUE(new_block != NULL);
-  ASSERT_EQ(block_mgr->bytes_allocated(), 0);
-  ASSERT_EQ(block_mgr->mem_tracker()->consumption(), 0);
-  ASSERT_EQ(client_tracker->consumption(), 128);
-  ASSERT_TRUE(new_block->is_pinned());
-  ASSERT_EQ(new_block->BytesRemaining(), 128);
-  ASSERT_TRUE(new_block->buffer() != NULL);
-  blocks.push_back(new_block);
-
-  // Allocate a normal block
-  ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block));
-  ASSERT_TRUE(new_block != NULL);
-  ASSERT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size());
-  ASSERT_EQ(block_mgr->mem_tracker()->consumption(), 
block_mgr->max_block_size());
-  ASSERT_EQ(client_tracker->consumption(), 128 + block_mgr->max_block_size());
-  ASSERT_TRUE(new_block->is_pinned());
-  ASSERT_EQ(new_block->BytesRemaining(), block_mgr->max_block_size());
-  ASSERT_TRUE(new_block->buffer() != NULL);
-  blocks.push_back(new_block);
-
-  // Allocate another small block.
-  ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, 512));
-  ASSERT_TRUE(new_block != NULL);
-  ASSERT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size());
-  ASSERT_EQ(block_mgr->mem_tracker()->consumption(), 
block_mgr->max_block_size());
-  ASSERT_EQ(client_tracker->consumption(), 128 + 512 + 
block_mgr->max_block_size());
-  ASSERT_TRUE(new_block->is_pinned());
-  ASSERT_EQ(new_block->BytesRemaining(), 512);
-  ASSERT_TRUE(new_block->buffer() != NULL);
-  blocks.push_back(new_block);
-
-  // Should be able to unpin and pin the middle block
-  ASSERT_OK(blocks[1]->Unpin());
-
-  bool pinned;
-  ASSERT_OK(blocks[1]->Pin(&pinned));
-  ASSERT_TRUE(pinned);
-
-  DeleteBlocks(blocks);
-  TearDownMgrs();
-}
-
-// Test that pinning more blocks than the max available buffers.
-TEST_F(BufferedBlockMgrTest, Pin) {
-  int max_num_blocks = 5;
-  const int block_size = 1024;
-  BufferedBlockMgr* block_mgr;
-  BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false, 
&client);
-
-  vector<BufferedBlockMgr::Block*> blocks;
-  AllocateBlocks(block_mgr, client, max_num_blocks, &blocks);
-
-  // Unpin them all.
-  for (int i = 0; i < blocks.size(); ++i) {
-    ASSERT_OK(blocks[i]->Unpin());
-  }
-
-  // Allocate more, this should work since we just unpinned some blocks.
-  AllocateBlocks(block_mgr, client, max_num_blocks, &blocks);
-
-  // Try to pin a unpinned block, this should not be possible.
-  bool pinned;
-  ASSERT_OK(blocks[0]->Pin(&pinned));
-  ASSERT_FALSE(pinned);
-
-  // Unpin all blocks.
-  for (int i = 0; i < blocks.size(); ++i) {
-    ASSERT_OK(blocks[i]->Unpin());
-  }
-
-  // Should be able to pin max_num_blocks blocks.
-  for (int i = 0; i < max_num_blocks; ++i) {
-    ASSERT_OK(blocks[i]->Pin(&pinned));
-    ASSERT_TRUE(pinned);
-  }
-
-  // Can't pin any more though.
-  ASSERT_OK(blocks[max_num_blocks]->Pin(&pinned));
-  ASSERT_FALSE(pinned);
-
-  DeleteBlocks(blocks);
-  TearDownMgrs();
-}
-
-// Test the eviction policy of the block mgr. No writes issued until more than
-// the max available buffers are allocated. Writes must be issued in LIFO 
order.
-TEST_F(BufferedBlockMgrTest, Eviction) {
-  TestEvictionImpl(1024);
-  TestEvictionImpl(8 * 1024 * 1024);
-}
-
-// Test deletion and reuse of blocks.
-TEST_F(BufferedBlockMgrTest, Deletion) {
-  int max_num_buffers = 5;
-  const int block_size = 1024;
-  BufferedBlockMgr* block_mgr;
-  BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false, 
&client);
-
-  // Check counters.
-  RuntimeProfile* profile = block_mgr->profile();
-  RuntimeProfile::Counter* recycled_cnt = 
profile->GetCounter("BlocksRecycled");
-  RuntimeProfile::Counter* created_cnt = profile->GetCounter("BlocksCreated");
-
-  vector<BufferedBlockMgr::Block*> blocks;
-  AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-  ASSERT_EQ(created_cnt->value(), max_num_buffers);
-
-  DeleteBlocks(blocks);
-  blocks.clear();
-  AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-  ASSERT_EQ(created_cnt->value(), max_num_buffers);
-  ASSERT_EQ(recycled_cnt->value(), max_num_buffers);
-
-  DeleteBlocks(blocks);
-  TearDownMgrs();
-}
-
-// Delete blocks of various sizes and statuses to exercise the different code 
paths.
-// This relies on internal validation in block manager to detect many errors.
-TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) {
-  int max_num_buffers = 16;
-  BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr =
-      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
-  MemTracker* client_tracker = block_mgr->get_tracker(client);
-
-  // Pinned I/O block.
-  BufferedBlockMgr::Block* new_block;
-  ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block));
-  ASSERT_TRUE(new_block != NULL);
-  ASSERT_TRUE(new_block->is_pinned());
-  ASSERT_TRUE(new_block->is_max_size());
-  new_block->Delete();
-  ASSERT_EQ(0, client_tracker->consumption());
-
-  // Pinned non-I/O block.
-  int small_block_size = 128;
-  ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, 
small_block_size));
-  ASSERT_TRUE(new_block != NULL);
-  ASSERT_TRUE(new_block->is_pinned());
-  ASSERT_EQ(small_block_size, client_tracker->consumption());
-  new_block->Delete();
-  ASSERT_EQ(0, client_tracker->consumption());
-
-  // Unpinned I/O block - delete after written to disk.
-  ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block));
-  ASSERT_TRUE(new_block != NULL);
-  ASSERT_TRUE(new_block->is_pinned());
-  ASSERT_TRUE(new_block->is_max_size());
-  new_block->Unpin();
-  ASSERT_FALSE(new_block->is_pinned());
-  WaitForWrites(block_mgr);
-  new_block->Delete();
-  ASSERT_EQ(client_tracker->consumption(), 0);
-
-  // Unpinned I/O block - delete before written to disk.
-  ASSERT_OK(block_mgr->GetNewBlock(client, NULL, &new_block));
-  ASSERT_TRUE(new_block != NULL);
-  ASSERT_TRUE(new_block->is_pinned());
-  ASSERT_TRUE(new_block->is_max_size());
-  new_block->Unpin();
-  ASSERT_FALSE(new_block->is_pinned());
-  new_block->Delete();
-  WaitForWrites(block_mgr);
-  ASSERT_EQ(client_tracker->consumption(), 0);
-
-  TearDownMgrs();
-}
-
-// This exercises a code path where:
-// 1. A block A is unpinned.
-// 2. A block B is unpinned.
-// 3. A write for block A is initiated.
-// 4. Block A is pinned.
-// 5. Block B is pinned, with block A passed in to be deleted.
-//    Block A's buffer will be transferred to block B.
-// 6. The write for block A completes.
-// Previously there was a bug (IMPALA-3936) where the buffer transfer happened 
before the
-// write completed. There were also various hangs related to missing condition 
variable
-// notifications.
-TEST_F(BufferedBlockMgrTest, TransferBufferDuringWrite) {
-  const int trials = 5;
-  const int max_num_buffers = 2;
-  BufferedBlockMgr::Client* client;
-  RuntimeState* query_state;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(
-      0, max_num_buffers, block_size_, 1, false, &client, &query_state);
-
-  for (int trial = 0; trial < trials; ++trial) {
-    for (int delay_ms = 0; delay_ms <= 10; delay_ms += 5) {
-      // Force writes to be delayed to enlarge window of opportunity for bug.
-      block_mgr->set_debug_write_delay_ms(delay_ms);
-      vector<BufferedBlockMgr::Block*> blocks;
-      AllocateBlocks(block_mgr, client, 2, &blocks);
-
-      // Force the second block to be written and have its buffer freed.
-      // We only have one buffer to share between the first and second blocks 
now.
-      ASSERT_OK(blocks[1]->Unpin());
-
-      // Create another client. Reserving different numbers of buffers can 
send it
-      // down different code paths because the original client is entitled to 
different
-      // number of buffers.
-      int reserved_buffers = trial % max_num_buffers;
-      BufferedBlockMgr::Client* tmp_client;
-      ASSERT_OK(block_mgr->RegisterClient("tmp_client", reserved_buffers, 
false,
-          NewClientTracker(query_state), query_state, &tmp_client));
-      BufferedBlockMgr::Block* tmp_block;
-      ASSERT_OK(block_mgr->GetNewBlock(tmp_client, NULL, &tmp_block));
-
-      // Initiate the write, repin the block, then immediately try to swap the 
buffer to
-      // the second block while the write is still in flight.
-      ASSERT_OK(blocks[0]->Unpin());
-      bool pinned;
-      ASSERT_OK(blocks[0]->Pin(&pinned));
-      ASSERT_TRUE(pinned);
-      ASSERT_OK(blocks[1]->Pin(&pinned, blocks[0], false));
-      ASSERT_TRUE(pinned);
-
-      blocks[1]->Delete();
-      tmp_block->Delete();
-      block_mgr->ClearReservations(tmp_client);
-    }
-  }
-}
-
-// Test that all APIs return cancelled after close.
-TEST_F(BufferedBlockMgrTest, Close) {
-  int max_num_buffers = 5;
-  const int block_size = 1024;
-  BufferedBlockMgr* block_mgr;
-  BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false, 
&client);
-
-  vector<BufferedBlockMgr::Block*> blocks;
-  AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-
-  block_mgr->Cancel();
-
-  BufferedBlockMgr::Block* new_block;
-  Status status = block_mgr->GetNewBlock(client, NULL, &new_block);
-  ASSERT_TRUE(status.IsCancelled());
-  ASSERT_TRUE(new_block == NULL);
-  status = blocks[0]->Unpin();
-  ASSERT_TRUE(status.IsCancelled());
-  bool pinned;
-  status = blocks[0]->Pin(&pinned);
-  ASSERT_TRUE(status.IsCancelled());
-
-  DeleteBlocks(blocks);
-  TearDownMgrs();
-}
-
-TEST_F(BufferedBlockMgrTest, DestructDuringWrite) {
-  const int trials = 20;
-  const int max_num_buffers = 5;
-
-  for (int trial = 0; trial < trials; ++trial) {
-    BufferedBlockMgr::Client* client;
-    BufferedBlockMgr* block_mgr =
-        CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
-
-    vector<BufferedBlockMgr::Block*> blocks;
-    AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-
-    // Unpin will initiate writes.
-    UnpinBlocks(blocks);
-
-    // Writes should still be in flight when blocks are deleted.
-    DeleteBlocks(blocks);
-
-    // Destruct block manager while blocks are deleted and writes are in 
flight.
-    TearDownMgrs();
-  }
-  // Destroying test environment will check that all writes have completed.
-}
-
-void BufferedBlockMgrTest::TestRuntimeStateTeardown(
-    bool write_error, bool wait_for_writes) {
-  const int max_num_buffers = 10;
-  RuntimeState* state;
-  BufferedBlockMgr::Client* client;
-  CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client, 
&state);
-
-  // Hold extra references to block mgr and query state so they outlive 
RuntimeState.
-  shared_ptr<BufferedBlockMgr> block_mgr;
-  QueryState::ScopedRef qs(state->query_id());
-  Status status = BufferedBlockMgr::Create(state, state->query_mem_tracker(),
-      state->runtime_profile(), test_env_->tmp_file_mgr(), 0, block_size_, 
&block_mgr);
-  ASSERT_TRUE(status.ok());
-  ASSERT_TRUE(block_mgr != NULL);
-
-  vector<BufferedBlockMgr::Block*> blocks;
-  AllocateBlocks(block_mgr.get(), client, max_num_buffers, &blocks);
-
-  if (write_error) {
-    // Force flushing blocks to disk then remove temporary file to force 
writes to fail.
-    UnpinBlocks(blocks);
-    vector<BufferedBlockMgr::Block*> more_blocks;
-    AllocateBlocks(block_mgr.get(), client, max_num_buffers, &more_blocks);
-
-    const string& tmp_file_path = blocks[0]->TmpFilePath();
-    DeleteBlocks(more_blocks);
-    PinBlocks(blocks);
-    DisableBackingFile(tmp_file_path);
-  }
-
-  // Unpin will initiate writes. If the write error propagates fast enough, 
some Unpin()
-  // calls may see a cancelled block mgr.
-  vector<TErrorCode::type> cancelled_code = {TErrorCode::CANCELLED};
-  UnpinBlocks(blocks, write_error ? &cancelled_code : nullptr);
-
-  // Tear down while writes are in flight. The block mgr may outlive the 
runtime state
-  // because it may be referenced by other runtime states. This test simulates 
this
-  // scenario by holding onto a reference to the block mgr. This should be 
safe so
-  // long as blocks are properly deleted before the runtime state is torn down.
-  DeleteBlocks(blocks);
-  test_env_->TearDownQueries();
-
-  // Optionally wait for writes to complete after cancellation.
-  if (wait_for_writes) WaitForWrites(block_mgr.get());
-  block_mgr.reset();
-
-  ASSERT_EQ(test_env_->TotalQueryMemoryConsumption(), 0);
-}
-
-TEST_F(BufferedBlockMgrTest, RuntimeStateTeardown) {
-  TestRuntimeStateTeardown(false, false);
-}
-
-TEST_F(BufferedBlockMgrTest, RuntimeStateTeardownWait) {
-  TestRuntimeStateTeardown(false, true);
-}
-
-TEST_F(BufferedBlockMgrTest, RuntimeStateTeardownWriteError) {
-  TestRuntimeStateTeardown(true, true);
-}
-
-// Regression test for IMPALA-2927 write complete with cancelled runtime state
-TEST_F(BufferedBlockMgrTest, WriteCompleteWithCancelledRuntimeState) {
-  const int max_num_buffers = 10;
-  RuntimeState* state;
-  BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr =
-      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client, 
&state);
-
-  vector<BufferedBlockMgr::Block*> blocks;
-  AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-
-  // Force flushing blocks to disk so that more writes are in flight.
-  UnpinBlocks(blocks);
-
-  // Cancel the runtime state and re-pin the blocks while writes are in flight 
to check
-  // that WriteComplete() handles the case ok.
-  state->set_is_cancelled();
-  PinBlocks(blocks);
-
-  WaitForWrites(block_mgr);
-  DeleteBlocks(blocks);
-}
-
-// Remove write permissions on scratch files. Return # of scratch files.
-static int remove_scratch_perms() {
-  int num_files = 0;
-  directory_iterator dir_it(SCRATCH_DIR);
-  for (; dir_it != directory_iterator(); ++dir_it) {
-    ++num_files;
-    chmod(dir_it->path().c_str(), 0);
-  }
-
-  return num_files;
-}
-
-// Test that the block manager behaves correctly after a write error.  Delete 
the scratch
-// directory before an operation that would cause a write and test that 
subsequent API
-// calls return 'CANCELLED' correctly.
-void BufferedBlockMgrTest::TestWriteError(int write_delay_ms) {
-  int max_num_buffers = 2;
-  const int block_size = 1024;
-  BufferedBlockMgr* block_mgr;
-  BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false, 
&client);
-  block_mgr->set_debug_write_delay_ms(write_delay_ms);
-
-  vector<BufferedBlockMgr::Block*> blocks;
-  AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-  // Unpin two blocks here, to ensure that backing storage is allocated in tmp 
file.
-  UnpinBlocks(blocks);
-  WaitForWrites(block_mgr);
-  // Repin the blocks
-  PinBlocks(blocks);
-  // Remove the backing storage so that future writes will fail
-  int num_files = remove_scratch_perms();
-  ASSERT_GT(num_files, 0);
-  vector<TErrorCode::type> expected_error_codes = {TErrorCode::CANCELLED,
-      TErrorCode::SCRATCH_ALLOCATION_FAILED};
-  // Give the first write a chance to fail before the second write starts.
-  int interval_ms = 10;
-  UnpinBlocks(blocks, &expected_error_codes, interval_ms);
-  WaitForWrites(block_mgr);
-  // Subsequent calls should fail.
-  DeleteBlocks(blocks);
-  BufferedBlockMgr::Block* new_block;
-  ASSERT_TRUE(block_mgr->GetNewBlock(client, NULL, &new_block).IsCancelled());
-  ASSERT_TRUE(new_block == NULL);
-
-  TearDownMgrs();
-}
-
-TEST_F(BufferedBlockMgrTest, WriteError) {
-  TestWriteError(0);
-}
-
-// Regression test for IMPALA-4842 - inject a delay in the write to
-// reproduce the issue.
-TEST_F(BufferedBlockMgrTest, WriteErrorWriteDelay) {
-  TestWriteError(100);
-}
-
-// Test block manager error handling when temporary file space cannot be 
allocated to
-// back an unpinned buffer.
-TEST_F(BufferedBlockMgrTest, TmpFileAllocateError) {
-  int max_num_buffers = 2;
-  BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr =
-      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
-
-  vector<BufferedBlockMgr::Block*> blocks;
-  AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-  // Unpin a block, forcing a write.
-  ASSERT_OK(blocks[0]->Unpin());
-  WaitForWrites(block_mgr);
-  // Remove temporary files - subsequent operations will fail.
-  int num_files = remove_scratch_perms();
-  ASSERT_TRUE(num_files > 0);
-  // Current implementation will not fail here until it attempts to write the 
file.
-  // This behavior is not contractual but we want to know if it changes 
accidentally.
-  ASSERT_OK(blocks[1]->Unpin());
-
-  // Write failure should cancel query
-  WaitForWrites(block_mgr);
-  ASSERT_TRUE(block_mgr->IsCancelled());
-
-  DeleteBlocks(blocks);
-  TearDownMgrs();
-}
-
-// Test that the block manager is able to blacklist a temporary device 
correctly after a
-// write error. The query that encountered the write error should not allocate 
more
-// blocks on that device, but existing blocks on the device will remain in use 
and future
-// queries will use the device.
-TEST_F(BufferedBlockMgrTest, WriteErrorBlacklist) {
-  // Set up two buffered block managers with two temporary dirs.
-  vector<string> tmp_dirs = InitMultipleTmpDirs(2);
-  // Simulate two concurrent queries.
-  const int NUM_BLOCK_MGRS = 2;
-  const int MAX_NUM_BLOCKS = 4;
-  int blocks_per_mgr = MAX_NUM_BLOCKS / NUM_BLOCK_MGRS;
-  vector<BufferedBlockMgr*> block_mgrs;
-  vector<BufferedBlockMgr::Client*> clients;
-  CreateMgrsAndClients(
-      0, NUM_BLOCK_MGRS, blocks_per_mgr, block_size_, 0, false, &block_mgrs, 
&clients);
-
-  // Allocate files for all 2x2 combinations by unpinning blocks.
-  vector<vector<BufferedBlockMgr::Block*>> blocks;
-  vector<BufferedBlockMgr::Block*> all_blocks;
-  for (int i = 0; i < NUM_BLOCK_MGRS; ++i) {
-    vector<BufferedBlockMgr::Block*> mgr_blocks;
-    AllocateBlocks(block_mgrs[i], clients[i], blocks_per_mgr, &mgr_blocks);
-    UnpinBlocks(mgr_blocks);
-    for (int j = 0; j < blocks_per_mgr; ++j) {
-      LOG(INFO) << "Manager " << i << " Block " << j << " backed by file "
-                << mgr_blocks[j]->TmpFilePath();
-    }
-    blocks.push_back(mgr_blocks);
-    all_blocks.insert(all_blocks.end(), mgr_blocks.begin(), mgr_blocks.end());
-  }
-  WaitForWrites(block_mgrs);
-  int error_mgr = 0;
-  int no_error_mgr = 1;
-  const string& error_dir = tmp_dirs[0];
-  const string& good_dir = tmp_dirs[1];
-  // Delete one file from first scratch dir for first block manager.
-  BufferedBlockMgr::Block* error_block = FindBlockForDir(blocks[error_mgr], 
error_dir);
-  ASSERT_TRUE(error_block != NULL) << "Expected a tmp file in dir " << 
error_dir;
-  const string& error_file_path = error_block->TmpFilePath();
-  PinBlocks(all_blocks);
-  DisableBackingFile(error_file_path);
-  UnpinBlocks(all_blocks); // Should succeed since writes occur asynchronously
-  WaitForWrites(block_mgrs);
-  // Both block managers have a usable tmp directory so should still be usable.
-  ASSERT_FALSE(block_mgrs[error_mgr]->IsCancelled());
-  ASSERT_FALSE(block_mgrs[no_error_mgr]->IsCancelled());
-  // Temporary device with error should still be active.
-  vector<TmpFileMgr::DeviceId> active_tmp_devices =
-      test_env_->tmp_file_mgr()->ActiveTmpDevices();
-  ASSERT_EQ(tmp_dirs.size(), active_tmp_devices.size());
-  for (int i = 0; i < active_tmp_devices.size(); ++i) {
-    const string& device_path =
-        test_env_->tmp_file_mgr()->GetTmpDirPath(active_tmp_devices[i]);
-    ASSERT_EQ(string::npos, error_dir.find(device_path));
-  }
-
-  // The error block manager should only allocate from the device that had no 
error.
-  // The non-error block manager should continue using both devices, since it 
didn't
-  // encounter a write error itself.
-  vector<BufferedBlockMgr::Block*> error_new_blocks;
-  AllocateBlocks(
-      block_mgrs[error_mgr], clients[error_mgr], blocks_per_mgr, 
&error_new_blocks);
-  UnpinBlocks(error_new_blocks);
-  WaitForWrites(block_mgrs);
-  EXPECT_TRUE(FindBlockForDir(error_new_blocks, good_dir) != NULL);
-  EXPECT_TRUE(FindBlockForDir(error_new_blocks, error_dir) == NULL);
-  for (int i = 0; i < error_new_blocks.size(); ++i) {
-    LOG(INFO) << "Newly created block backed by file "
-              << error_new_blocks[i]->TmpFilePath();
-    EXPECT_TRUE(BlockInDir(error_new_blocks[i], good_dir));
-  }
-  DeleteBlocks(error_new_blocks);
-
-  PinBlocks(blocks[no_error_mgr]);
-  UnpinBlocks(blocks[no_error_mgr]);
-  WaitForWrites(block_mgrs);
-  EXPECT_TRUE(FindBlockForDir(blocks[no_error_mgr], good_dir) != NULL);
-  EXPECT_TRUE(FindBlockForDir(blocks[no_error_mgr], error_dir) != NULL);
-
-  // The second block manager should use the bad directory for new blocks since
-  // blacklisting is per-manager, not global.
-  vector<BufferedBlockMgr::Block*> no_error_new_blocks;
-  AllocateBlocks(block_mgrs[no_error_mgr], clients[no_error_mgr], 
blocks_per_mgr,
-      &no_error_new_blocks);
-  UnpinBlocks(no_error_new_blocks);
-  WaitForWrites(block_mgrs);
-  EXPECT_TRUE(FindBlockForDir(no_error_new_blocks, good_dir) != NULL);
-  EXPECT_TRUE(FindBlockForDir(no_error_new_blocks, error_dir) != NULL);
-  DeleteBlocks(no_error_new_blocks);
-
-  // A new block manager should use the both dirs for backing storage.
-  BufferedBlockMgr::Client* new_client;
-  BufferedBlockMgr* new_block_mgr =
-      CreateMgrAndClient(9999, blocks_per_mgr, block_size_, 0, false, 
&new_client);
-  vector<BufferedBlockMgr::Block*> new_mgr_blocks;
-  AllocateBlocks(new_block_mgr, new_client, blocks_per_mgr, &new_mgr_blocks);
-  UnpinBlocks(new_mgr_blocks);
-  WaitForWrites(block_mgrs);
-  EXPECT_TRUE(FindBlockForDir(new_mgr_blocks, good_dir) != NULL);
-  EXPECT_TRUE(FindBlockForDir(new_mgr_blocks, error_dir) != NULL);
-  DeleteBlocks(new_mgr_blocks);
-
-  DeleteBlocks(all_blocks);
-}
-
-// Check that allocation error resulting from removal of directory results in 
blocks
-/// being allocated in other directories.
-TEST_F(BufferedBlockMgrTest, AllocationErrorHandling) {
-  // Set up two buffered block managers with two temporary dirs.
-  vector<string> tmp_dirs = InitMultipleTmpDirs(2);
-  // Simulate two concurrent queries.
-  int num_block_mgrs = 2;
-  int max_num_blocks = 4;
-  int blocks_per_mgr = max_num_blocks / num_block_mgrs;
-  vector<RuntimeState*> runtime_states;
-  vector<BufferedBlockMgr*> block_mgrs;
-  vector<BufferedBlockMgr::Client*> clients;
-  CreateMgrsAndClients(
-      0, num_block_mgrs, blocks_per_mgr, block_size_, 0, false, &block_mgrs, 
&clients);
-
-  // Allocate files for all 2x2 combinations by unpinning blocks.
-  vector<vector<BufferedBlockMgr::Block*>> blocks;
-  for (int i = 0; i < num_block_mgrs; ++i) {
-    vector<BufferedBlockMgr::Block*> mgr_blocks;
-    LOG(INFO) << "Iter " << i;
-    AllocateBlocks(block_mgrs[i], clients[i], blocks_per_mgr, &mgr_blocks);
-    blocks.push_back(mgr_blocks);
-  }
-  const string& bad_dir = tmp_dirs[0];
-  const string& bad_scratch_subdir = bad_dir + SCRATCH_SUFFIX;
-  chmod(bad_scratch_subdir.c_str(), 0);
-  // The block mgr should attempt to allocate space in bad dir for one block, 
which will
-  // cause an error when it tries to create/expand the file. It should recover 
and just
-  // use the good dir.
-  UnpinBlocks(blocks[0]);
-  // Directories remain on active list even when they experience errors.
-  ASSERT_EQ(2, test_env_->tmp_file_mgr()->NumActiveTmpDevices());
-  // Blocks should not be written to bad dir even if it remains non-writable.
-  UnpinBlocks(blocks[1]);
-  // All writes should succeed.
-  WaitForWrites(block_mgrs);
-  for (int i = 0; i < blocks.size(); ++i) {
-    DeleteBlocks(blocks[i]);
-  }
-}
-
-// Test that block manager fails cleanly when all directories are inaccessible 
at runtime.
-TEST_F(BufferedBlockMgrTest, NoDirsAllocationError) {
-  vector<string> tmp_dirs = InitMultipleTmpDirs(2);
-  int max_num_buffers = 2;
-  RuntimeState* runtime_state;
-  BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(
-      0, max_num_buffers, block_size_, 0, false, &client, &runtime_state);
-  vector<BufferedBlockMgr::Block*> blocks;
-  AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-  for (int i = 0; i < tmp_dirs.size(); ++i) {
-    const string& tmp_scratch_subdir = tmp_dirs[i] + SCRATCH_SUFFIX;
-    chmod(tmp_scratch_subdir.c_str(), 0);
-  }
-  ErrorLogMap error_log;
-  runtime_state->GetErrors(&error_log);
-  ASSERT_TRUE(error_log.empty());
-  // Unpin the blocks. Unpinning may fail if it hits a write error before this 
thread is
-  // done unpinning.
-  vector<TErrorCode::type> cancelled_code = {TErrorCode::CANCELLED};
-  UnpinBlocks(blocks, &cancelled_code);
-
-  LOG(INFO) << "Waiting for writes.";
-  // Write failure should cancel query.
-  WaitForWrites(block_mgr);
-  LOG(INFO) << "writes done.";
-  ASSERT_TRUE(block_mgr->IsCancelled());
-  runtime_state->GetErrors(&error_log);
-  ASSERT_FALSE(error_log.empty());
-  stringstream error_string;
-  PrintErrorMap(&error_string, error_log);
-  LOG(INFO) << "Errors: " << error_string.str();
-  // SCRATCH_ALLOCATION_FAILED error should exist in the error log.
-  ErrorLogMap::const_iterator it = 
error_log.find(TErrorCode::SCRATCH_ALLOCATION_FAILED);
-  ASSERT_NE(it, error_log.end());
-  ASSERT_GT(it->second.count, 0);
-  DeleteBlocks(blocks);
-}
-
-// Test that block manager can still allocate buffers when spilling is 
disabled.
-TEST_F(BufferedBlockMgrTest, NoTmpDirs) {
-  InitMultipleTmpDirs(0);
-  int max_num_buffers = 3;
-  BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr =
-      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
-  vector<BufferedBlockMgr::Block*> blocks;
-  AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-  DeleteBlocks(blocks);
-}
-
-// Test that block manager can still allocate buffers when spilling is 
disabled by
-// setting scratch_limit = 0.
-TEST_F(BufferedBlockMgrTest, ScratchLimitZero) {
-  int max_num_buffers = 3;
-  BufferedBlockMgr::Client* client;
-  TQueryOptions query_options;
-  query_options.scratch_limit = 0;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(
-      0, max_num_buffers, block_size_, 0, false, &client, NULL, 
&query_options);
-  vector<BufferedBlockMgr::Block*> blocks;
-  AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-  DeleteBlocks(blocks);
-}
-
-// Create two clients with different number of reserved buffers.
-TEST_F(BufferedBlockMgrTest, MultipleClients) {
-  int client1_buffers = 3;
-  int client2_buffers = 5;
-  int max_num_buffers = client1_buffers + client2_buffers;
-  const int block_size = 1024;
-  RuntimeState* runtime_state;
-  BufferedBlockMgr* block_mgr = CreateMgr(0, max_num_buffers, block_size, 
&runtime_state);
-
-  BufferedBlockMgr::Client* client1 = NULL;
-  BufferedBlockMgr::Client* client2 = NULL;
-  ASSERT_OK(block_mgr->RegisterClient("", client1_buffers, false,
-      NewClientTracker(runtime_state), runtime_state, &client1));
-  ASSERT_TRUE(client1 != NULL);
-  ASSERT_OK(block_mgr->RegisterClient("", client2_buffers, false,
-      NewClientTracker(runtime_state), runtime_state, &client2));
-  ASSERT_TRUE(client2 != NULL);
-
-  // Reserve client 1's and 2's buffers. They should succeed.
-  bool reserved = block_mgr->TryAcquireTmpReservation(client1, 1);
-  ASSERT_TRUE(reserved);
-  reserved = block_mgr->TryAcquireTmpReservation(client2, 1);
-  ASSERT_TRUE(reserved);
-
-  vector<BufferedBlockMgr::Block*> client1_blocks;
-  // Allocate all of client1's reserved blocks, they should all succeed.
-  AllocateBlocks(block_mgr, client1, client1_buffers, &client1_blocks);
-
-  // Try allocating one more, that should fail.
-  BufferedBlockMgr::Block* block;
-  ASSERT_OK(block_mgr->GetNewBlock(client1, NULL, &block));
-  ASSERT_TRUE(block == NULL);
-
-  // Trying to reserve should also fail.
-  reserved = block_mgr->TryAcquireTmpReservation(client1, 1);
-  ASSERT_FALSE(reserved);
-
-  // Allocate all of client2's reserved blocks, these should succeed.
-  vector<BufferedBlockMgr::Block*> client2_blocks;
-  AllocateBlocks(block_mgr, client2, client2_buffers, &client2_blocks);
-
-  // Try allocating one more from client 2, that should fail.
-  ASSERT_OK(block_mgr->GetNewBlock(client2, NULL, &block));
-  ASSERT_TRUE(block == NULL);
-
-  // Unpin one block from client 1.
-  ASSERT_OK(client1_blocks[0]->Unpin());
-
-  // Client 2 should still not be able to allocate.
-  ASSERT_OK(block_mgr->GetNewBlock(client2, NULL, &block));
-  ASSERT_TRUE(block == NULL);
-
-  // Client 2 should still not be able to reserve.
-  reserved = block_mgr->TryAcquireTmpReservation(client2, 1);
-  ASSERT_FALSE(reserved);
-
-  // Client 1 should be able to though.
-  ASSERT_OK(block_mgr->GetNewBlock(client1, NULL, &block));
-  ASSERT_TRUE(block != NULL);
-  client1_blocks.push_back(block);
-
-  // Unpin two of client 1's blocks (client 1 should have 3 unpinned blocks 
now).
-  ASSERT_OK(client1_blocks[1]->Unpin());
-  ASSERT_OK(client1_blocks[2]->Unpin());
-
-  // Clear client 1's reservation
-  block_mgr->ClearReservations(client1);
-
-  // Client 2 should be able to reserve 1 buffers now (there are 2 left);
-  reserved = block_mgr->TryAcquireTmpReservation(client2, 1);
-  ASSERT_TRUE(reserved);
-
-  // Client one can only pin 1.
-  bool pinned;
-  ASSERT_OK(client1_blocks[0]->Pin(&pinned));
-  ASSERT_TRUE(pinned);
-  // Can't get this one.
-  ASSERT_OK(client1_blocks[1]->Pin(&pinned));
-  ASSERT_FALSE(pinned);
-
-  // Client 2 can pick up the one reserved buffer
-  ASSERT_OK(block_mgr->GetNewBlock(client2, NULL, &block));
-  ASSERT_TRUE(block != NULL);
-  client2_blocks.push_back(block);
-
-  // But not a second
-  BufferedBlockMgr::Block* block2;
-  ASSERT_OK(block_mgr->GetNewBlock(client2, NULL, &block2));
-  ASSERT_TRUE(block2 == NULL);
-
-  // Unpin client 2's block it got from the reservation. Sine this is a tmp
-  // reservation, client 1 can pick it up again (it is not longer reserved).
-  ASSERT_OK(block->Unpin());
-  ASSERT_OK(client1_blocks[1]->Pin(&pinned));
-  ASSERT_TRUE(pinned);
-
-  DeleteBlocks(client1_blocks);
-  DeleteBlocks(client2_blocks);
-  TearDownMgrs();
-}
-
-// Create two clients with different number of reserved buffers and some 
additional.
-TEST_F(BufferedBlockMgrTest, MultipleClientsExtraBuffers) {
-  int client1_buffers = 1;
-  int client2_buffers = 1;
-  int max_num_buffers = client1_buffers + client2_buffers + 2;
-  const int block_size = 1024;
-  RuntimeState* runtime_state;
-  BufferedBlockMgr* block_mgr = CreateMgr(0, max_num_buffers, block_size, 
&runtime_state);
-
-  BufferedBlockMgr::Client* client1 = NULL;
-  BufferedBlockMgr::Client* client2 = NULL;
-  BufferedBlockMgr::Block* block = NULL;
-  ASSERT_OK(block_mgr->RegisterClient("", client1_buffers, false,
-      NewClientTracker(runtime_state), runtime_state, &client1));
-  ASSERT_TRUE(client1 != NULL);
-  ASSERT_OK(block_mgr->RegisterClient("", client2_buffers, false,
-      NewClientTracker(runtime_state), runtime_state, &client2));
-  ASSERT_TRUE(client2 != NULL);
-
-  vector<BufferedBlockMgr::Block*> client1_blocks;
-  // Allocate all of client1's reserved blocks, they should all succeed.
-  AllocateBlocks(block_mgr, client1, client1_buffers, &client1_blocks);
-
-  // Allocate all of client2's reserved blocks, these should succeed.
-  vector<BufferedBlockMgr::Block*> client2_blocks;
-  AllocateBlocks(block_mgr, client2, client2_buffers, &client2_blocks);
-
-  // We have two spare buffers now. Each client should be able to allocate it.
-  ASSERT_OK(block_mgr->GetNewBlock(client1, NULL, &block));
-  ASSERT_TRUE(block != NULL);
-  client1_blocks.push_back(block);
-  ASSERT_OK(block_mgr->GetNewBlock(client2, NULL, &block));
-  ASSERT_TRUE(block != NULL);
-  client2_blocks.push_back(block);
-
-  // Now we are completely full, no one should be able to allocate a new block.
-  ASSERT_OK(block_mgr->GetNewBlock(client1, NULL, &block));
-  ASSERT_TRUE(block == NULL);
-  ASSERT_OK(block_mgr->GetNewBlock(client2, NULL, &block));
-  ASSERT_TRUE(block == NULL);
-
-  DeleteBlocks(client1_blocks);
-  DeleteBlocks(client2_blocks);
-  TearDownMgrs();
-}
-
-// Create multiple clients causing oversubscription.
-TEST_F(BufferedBlockMgrTest, ClientOversubscription) {
-  Status status;
-  int client1_buffers = 1;
-  int client2_buffers = 2;
-  int client3_buffers = 2;
-  int max_num_buffers = 2;
-  const int block_size = 1024;
-  RuntimeState* runtime_state;
-  BufferedBlockMgr* block_mgr = CreateMgr(0, max_num_buffers, block_size, 
&runtime_state);
-  vector<BufferedBlockMgr::Block*> blocks;
-
-  BufferedBlockMgr::Client* client1 = NULL;
-  BufferedBlockMgr::Client* client2 = NULL;
-  BufferedBlockMgr::Client* client3 = NULL;
-  BufferedBlockMgr::Block* block = NULL;
-  ASSERT_OK(block_mgr->RegisterClient("", client1_buffers, false,
-      NewClientTracker(runtime_state), runtime_state, &client1));
-  ASSERT_TRUE(client1 != NULL);
-  ASSERT_OK(block_mgr->RegisterClient("", client2_buffers, false,
-      NewClientTracker(runtime_state), runtime_state, &client2));
-  ASSERT_TRUE(client2 != NULL);
-  ASSERT_OK(block_mgr->RegisterClient("", client3_buffers, true,
-      NewClientTracker(runtime_state), runtime_state, &client3));
-  ASSERT_TRUE(client3 != NULL);
-
-  // Client one allocates first block, should work.
-  ASSERT_OK(block_mgr->GetNewBlock(client1, NULL, &block));
-  ASSERT_TRUE(block != NULL);
-  blocks.push_back(block);
-
-  // Client two allocates first block, should work.
-  ASSERT_OK(block_mgr->GetNewBlock(client2, NULL, &block));
-  ASSERT_TRUE(block != NULL);
-  blocks.push_back(block);
-
-  // At this point we've used both buffers. Client one reserved one so 
subsequent
-  // calls should fail with no error (but returns no block).
-  ASSERT_OK(block_mgr->GetNewBlock(client1, NULL, &block));
-  ASSERT_TRUE(block == NULL);
-
-  // Allocate with client two. Since client two reserved 2 buffers, this 
should fail
-  // with MEM_LIMIT_EXCEEDED.
-  ASSERT_TRUE(block_mgr->GetNewBlock(client2, NULL, 
&block).IsMemLimitExceeded());
-
-  // Allocate with client three. Since client three can tolerate 
oversubscription,
-  // this should fail with no error even though it was a reserved request.
-  ASSERT_OK(block_mgr->GetNewBlock(client3, NULL, &block));
-  ASSERT_TRUE(block == NULL);
-
-  DeleteBlocks(blocks);
-  TearDownMgrs();
-}
-
-TEST_F(BufferedBlockMgrTest, SingleRandom_plain) {
-  FLAGS_disk_spill_encryption = false;
-  TestRandomInternalSingle(1024);
-  TestRandomInternalSingle(8 * 1024);
-  TestRandomInternalSingle(8 * 1024 * 1024);
-}
-
-TEST_F(BufferedBlockMgrTest, Multi2Random_plain) {
-  FLAGS_disk_spill_encryption = false;
-  TestRandomInternalMulti(2, 1024);
-  TestRandomInternalMulti(2, 8 * 1024);
-  TestRandomInternalMulti(2, 8 * 1024 * 1024);
-}
-
-TEST_F(BufferedBlockMgrTest, Multi4Random_plain) {
-  FLAGS_disk_spill_encryption = false;
-  TestRandomInternalMulti(4, 1024);
-  TestRandomInternalMulti(4, 8 * 1024);
-  TestRandomInternalMulti(4, 8 * 1024 * 1024);
-}
-
-// TODO: Enable when we improve concurrency/scalability of block mgr.
-// TEST_F(BufferedBlockMgrTest, Multi8Random_plain) {
-//   FLAGS_disk_spill_encryption = false;
-//   TestRandomInternalMulti(8);
-// }
-
-TEST_F(BufferedBlockMgrTest, SingleRandom_encryption) {
-  FLAGS_disk_spill_encryption = true;
-  TestRandomInternalSingle(8 * 1024);
-}
-
-TEST_F(BufferedBlockMgrTest, Multi2Random_encryption) {
-  FLAGS_disk_spill_encryption = true;
-  TestRandomInternalMulti(2, 8 * 1024);
-}
-
-TEST_F(BufferedBlockMgrTest, Multi4Random_encryption) {
-  FLAGS_disk_spill_encryption = true;
-  TestRandomInternalMulti(4, 8 * 1024);
-}
-
-// TODO: Enable when we improve concurrency/scalability of block mgr.
-// TEST_F(BufferedBlockMgrTest, Multi8Random_encryption) {
-//   FLAGS_disk_spill_encryption = true;
-//   TestRandomInternalMulti(8);
-// }
-
-
-TEST_F(BufferedBlockMgrTest, CreateDestroyMulti) {
-  CreateDestroyMulti();
-}
-
-}
-
-int main(int argc, char** argv) {
-  ::testing::InitGoogleTest(&argc, argv);
-  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
-  impala::InitFeSupport();
-  impala::LlvmCodeGen::InitializeLlvm();
-  return RUN_ALL_TESTS();
-}

Reply via email to