Repository: incubator-impala
Updated Branches:
  refs/heads/master d83783e24 -> 872d5462b


IMPALA-3671: Add query option to limit scratch space usage

Currently we can only disable spilling via a startup option which means
we need to restart the cluster for this.
This patch adds a new query option 'SCRATCH_LIMIT' that limits the amount of
scratch directory space that can be used. This would be useful to prevent
runaway queries or to prevent queries from spilling when that is not desired.
This also adds a 'ScratchSpace' counter to the runtime profile of the
BlockMgr that keeps track of the scratch space allocated.

Valid values for the SCRATCH_LIMIT query option are:
- unspecified or a limit of -1 means no limit
- a limit of 0 (zero) means spilling is disabled
- an int (= number of bytes)
- a float followed by "M" (MB) or "G" (GB)

Testing:
A new test file "test_scratch_limit.py" was added for testing functionality.

Change-Id: Ibf8842626ded1345b632a0ccdb9a580e6a0ad470
Reviewed-on: http://gerrit.cloudera.org:8080/4497
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9313dcdb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9313dcdb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9313dcdb

Branch: refs/heads/master
Commit: 9313dcdb830b0cd24479ca892988d17defc9ca19
Parents: d83783e
Author: Bikramjeet Vig <[email protected]>
Authored: Wed Aug 3 11:11:54 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Sat Sep 24 02:48:46 2016 +0000

----------------------------------------------------------------------
 be/src/runtime/buffered-block-mgr-test.cc  |  24 ++++-
 be/src/runtime/buffered-block-mgr.cc       |  52 ++++++-----
 be/src/runtime/buffered-block-mgr.h        |  19 ++--
 be/src/runtime/test-env.cc                 |   8 +-
 be/src/runtime/test-env.h                  |  11 ++-
 be/src/runtime/tmp-file-mgr-test.cc        | 117 ++++++++++++++++--------
 be/src/runtime/tmp-file-mgr.cc             |  45 ++++++++-
 be/src/runtime/tmp-file-mgr.h              |  98 ++++++++++++++++----
 be/src/service/query-options.cc            |  12 +++
 be/src/service/query-options.h             |   5 +-
 common/thrift/ImpalaInternalService.thrift |   3 +
 common/thrift/ImpalaService.thrift         |   7 +-
 common/thrift/generate_error_codes.py      |   5 +-
 tests/query_test/test_scratch_limit.py     | 105 +++++++++++++++++++++
 14 files changed, 407 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc 
b/be/src/runtime/buffered-block-mgr-test.cc
index 169baf2..5eb1f8c 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -141,19 +141,21 @@ class BufferedBlockMgrTest : public ::testing::Test {
 
   /// Helper to create a simple block manager.
   BufferedBlockMgr* CreateMgr(int64_t query_id, int max_buffers, int 
block_size,
-      RuntimeState** query_state = NULL) {
+      RuntimeState** query_state = NULL, TQueryOptions* query_options = NULL) {
     RuntimeState* state;
     EXPECT_TRUE(test_env_->CreateQueryState(query_id, max_buffers, block_size,
-        &state).ok());
+        &state, query_options).ok());
     if (query_state != NULL) *query_state = state;
     return state->block_mgr();
   }
 
   BufferedBlockMgr* CreateMgrAndClient(int64_t query_id, int max_buffers, int 
block_size,
       int reserved_blocks, bool tolerates_oversubscription, MemTracker* 
tracker,
-      BufferedBlockMgr::Client** client, RuntimeState** query_state = NULL) {
+      BufferedBlockMgr::Client** client, RuntimeState** query_state = NULL,
+      TQueryOptions* query_options = NULL) {
     RuntimeState* state;
-    BufferedBlockMgr* mgr = CreateMgr(query_id, max_buffers, block_size, 
&state);
+    BufferedBlockMgr* mgr = CreateMgr(query_id, max_buffers, block_size, 
&state,
+        query_options);
     EXPECT_TRUE(mgr->RegisterClient(Substitute("Client for query $0", 
query_id),
         reserved_blocks, tolerates_oversubscription, tracker, state, 
client).ok());
     EXPECT_TRUE(client != NULL);
@@ -1188,6 +1190,20 @@ TEST_F(BufferedBlockMgrTest, NoTmpDirs) {
   DeleteBlocks(blocks);
 }
 
+// Test that block manager can still allocate buffers when spilling is 
disabled by
+// setting scratch_limit = 0.
+TEST_F(BufferedBlockMgrTest, ScratchLimitZero) {
+  int max_num_buffers = 3;
+  BufferedBlockMgr::Client* client;
+  TQueryOptions query_options;
+  query_options.scratch_limit = 0;
+  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, 
block_size_,
+      0, false, client_tracker_.get(), &client, NULL, &query_options);
+  vector<BufferedBlockMgr::Block*> blocks;
+  AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
+  DeleteBlocks(blocks);
+}
+
 // Create two clients with different number of reserved buffers.
 TEST_F(BufferedBlockMgrTest, MultipleClients) {
   int client1_buffers = 3;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc 
b/be/src/runtime/buffered-block-mgr.cc
index d582c31..b2c3973 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -209,17 +209,19 @@ string BufferedBlockMgr::Block::DebugString() const {
 }
 
 BufferedBlockMgr::BufferedBlockMgr(RuntimeState* state, TmpFileMgr* 
tmp_file_mgr,
-    int64_t block_size)
+    int64_t block_size, int64_t scratch_limit)
   : max_block_size_(block_size),
     // Keep two writes in flight per scratch disk so the disks can stay busy.
     block_write_threshold_(tmp_file_mgr->num_active_tmp_devices() * 2),
-    disable_spill_(state->query_ctx().disable_spilling || 
block_write_threshold_ == 0),
+    disable_spill_(state->query_ctx().disable_spilling || 
block_write_threshold_ == 0 ||
+        scratch_limit == 0),
     query_id_(state->query_id()),
     tmp_file_mgr_(tmp_file_mgr),
     initialized_(false),
     unfullfilled_reserved_buffers_(0),
     total_pinned_buffers_(0),
     non_local_outstanding_writes_(0),
+    tmp_file_group(new TmpFileMgr::FileGroup(tmp_file_mgr, scratch_limit)),
     io_mgr_(state->io_mgr()),
     is_cancelled_(false),
     writes_issued_(0),
@@ -242,7 +244,8 @@ Status BufferedBlockMgr::Create(RuntimeState* state, 
MemTracker* parent,
       // all shared_ptr references have gone to 0 and it is in the process of
       // being deleted. This can happen if the last shared reference is 
released
       // but before the weak ptr is removed from the map.
-      block_mgr->reset(new BufferedBlockMgr(state, tmp_file_mgr, block_size));
+      block_mgr->reset(new BufferedBlockMgr(state, tmp_file_mgr, block_size,
+          state->query_options().scratch_limit));
       query_to_block_mgrs_[state->query_id()] = *block_mgr;
     }
   }
@@ -557,10 +560,7 @@ BufferedBlockMgr::~BufferedBlockMgr() {
   // See IMPALA-1890.
   DCHECK_EQ(non_local_outstanding_writes_, 0) << endl << DebugInternal();
   // Delete tmp files.
-  for (TmpFileMgr::File& file: tmp_files_) {
-    file.Remove();
-  }
-  tmp_files_.clear();
+  tmp_file_group->Close();
 
   // Validate that clients deleted all of their blocks. Since all writes have
   // completed at this point, any deleted blocks should be in unused_blocks_.
@@ -763,7 +763,7 @@ Status BufferedBlockMgr::WriteUnpinnedBlock(Block* block) {
   DCHECK_EQ(block->buffer_desc_->len, max_block_size_);
 
   if (block->write_range_ == NULL) {
-    if (tmp_files_.empty()) RETURN_IF_ERROR(InitTmpFiles());
+    if (tmp_file_group->NumFiles() == 0) RETURN_IF_ERROR(InitTmpFiles());
 
     // First time the block is being persisted - need to allocate tmp file 
space.
     TmpFileMgr::File* tmp_file;
@@ -823,12 +823,18 @@ Status BufferedBlockMgr::AllocateScratchSpace(int64_t 
block_size,
   // Assumes block manager lock is already taken.
   vector<Status> errs;
   // Find the next physical file in round-robin order and create a write range 
for it.
-  for (int attempt = 0; attempt < tmp_files_.size(); ++attempt) {
-    *tmp_file = &tmp_files_[next_block_index_];
-    next_block_index_ = (next_block_index_ + 1) % tmp_files_.size();
+  for (int attempt = 0; attempt < tmp_file_group->NumFiles(); ++attempt) {
+    *tmp_file = tmp_file_group->GetFileAt(next_block_index_);
+    next_block_index_ = (next_block_index_ + 1) % tmp_file_group->NumFiles();
     if ((*tmp_file)->is_blacklisted()) continue;
-    Status status = (*tmp_file)->AllocateSpace(max_block_size_, file_offset);
-    if (status.ok()) return Status::OK();
+    Status status = (*tmp_file)->AllocateSpace(block_size, file_offset);
+    if (status.ok()) {
+      scratch_space_bytes_used_counter_->Add(block_size);
+      return Status::OK();
+    } else if (status.code() == TErrorCode::SCRATCH_LIMIT_EXCEEDED) {
+      // We cannot allocate from any files if we're at the scratch limit.
+      return status;
+    }
     // Log error and try other files if there was a problem. Problematic files 
will be
     // blacklisted so we will not repeatedly log the same error.
     LOG(WARNING) << "Error while allocating range in scratch file '"
@@ -1315,6 +1321,8 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, 
RuntimeProfile* parent_profile,
   buffer_wait_timer_ = ADD_TIMER(profile_.get(), "TotalBufferWaitTime");
   encryption_timer_ = ADD_TIMER(profile_.get(), "TotalEncryptionTime");
   integrity_check_timer_ = ADD_TIMER(profile_.get(), 
"TotalIntegrityCheckTime");
+  scratch_space_bytes_used_counter_ =
+    ADD_COUNTER(profile_.get(), "ScratchFileUsedBytes", TUnit::BYTES);
 
   // Create a new mem_tracker and allocate buffers.
   mem_tracker_.reset(
@@ -1324,25 +1332,25 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, 
RuntimeProfile* parent_profile,
 }
 
 Status BufferedBlockMgr::InitTmpFiles() {
-  DCHECK(tmp_files_.empty());
+  DCHECK(tmp_file_group->NumFiles() == 0);
   DCHECK(tmp_file_mgr_ != NULL);
 
   vector<TmpFileMgr::DeviceId> tmp_devices = 
tmp_file_mgr_->active_tmp_devices();
+  int files_allocated = 0;
   // Initialize the tmp files and the initial file to use.
-  tmp_files_.reserve(tmp_devices.size());
   for (int i = 0; i < tmp_devices.size(); ++i) {
-    TmpFileMgr::File* tmp_file;
     TmpFileMgr::DeviceId tmp_device_id = tmp_devices[i];
-    // It is possible for a device to be blacklisted after it was returned
-    // by active_tmp_devices() - handle this gracefully.
-    Status status = tmp_file_mgr_->GetFile(tmp_device_id, query_id_, 
&tmp_file);
-    if (status.ok()) tmp_files_.push_back(tmp_file);
+    // It is possible for a device to be blacklisted after it was returned by
+    // active_tmp_devices(), handle this gracefully by ignoring the return 
status of
+    // NewFile().
+    if (tmp_file_group->NewFile(tmp_device_id, query_id_).ok()) 
++files_allocated;
   }
-  if (tmp_files_.empty()) {
+  DCHECK_EQ(tmp_file_group->NumFiles(), files_allocated);
+  if (tmp_file_group->NumFiles() == 0) {
     return Status("No spilling directories configured. Cannot spill. Set 
--scratch_dirs"
         " or see log for previous errors that prevented use of provided 
directories");
   }
-  next_block_index_ = rand() % tmp_files_.size();
+  next_block_index_ = rand() % tmp_file_group->NumFiles();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/buffered-block-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.h 
b/be/src/runtime/buffered-block-mgr.h
index ac707bc..ad8ad85 100644
--- a/be/src/runtime/buffered-block-mgr.h
+++ b/be/src/runtime/buffered-block-mgr.h
@@ -423,7 +423,8 @@ class BufferedBlockMgr {
     }
   };
 
-  BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr, int64_t 
block_size);
+  BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr,
+      int64_t block_size, int64_t scratch_limit);
 
   /// Initializes the block mgr. Idempotent and thread-safe.
   void Init(DiskIoMgr* io_mgr, RuntimeProfile* profile,
@@ -492,7 +493,8 @@ class BufferedBlockMgr {
   void WaitForWrite(boost::unique_lock<boost::mutex>& lock, Block* block);
 
   /// Allocate block_size bytes in a temporary file. Try multiple disks if 
error occurs.
-  /// Returns an error only if no temporary files are usable.
+  /// Returns an error only if no temporary files are usable or the scratch 
limit is
+  /// exceeded.
   Status AllocateScratchSpace(int64_t block_size, TmpFileMgr::File** tmp_file,
       int64_t* file_offset);
 
@@ -590,12 +592,12 @@ class BufferedBlockMgr {
   /// All allocated io-sized buffers.
   std::list<BufferDescriptor*> all_io_buffers_;
 
-  /// Temporary physical file handle, (one per tmp device) to which blocks may 
be written.
-  /// Blocks are round-robined across these files.
-  boost::ptr_vector<TmpFileMgr::File> tmp_files_;
+  /// Group of temporary physical files, (one per tmp device) to which
+  /// blocks may be written. Blocks are round-robined across these files.
+  boost::scoped_ptr<TmpFileMgr::FileGroup> tmp_file_group;
 
-  /// Index into tmp_files_ denoting the file to which the next block to be 
persisted will
-  /// be written.
+  /// Index into 'tmp_file_group_' denoting the file to which the next block 
will be
+  /// written.
   int next_block_index_;
 
   /// DiskIoMgr handles to read and write blocks.
@@ -641,6 +643,9 @@ class BufferedBlockMgr {
   /// Time spent in disk spill integrity generation and checking.
   RuntimeProfile::Counter* integrity_check_timer_;
 
+  /// Amount of scratch space allocated in bytes.
+  RuntimeProfile::Counter* scratch_space_bytes_used_counter_;
+
   /// Number of writes issued.
   int writes_issued_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 8690e39..c5a9a41 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -66,16 +66,18 @@ TestEnv::~TestEnv() {
   metrics_.reset();
 }
 
-RuntimeState* TestEnv::CreateRuntimeState(int64_t query_id) {
+RuntimeState* TestEnv::CreateRuntimeState(int64_t query_id,
+    TQueryOptions* query_options) {
   TExecPlanFragmentParams plan_params = TExecPlanFragmentParams();
+  if (query_options != NULL) plan_params.query_ctx.request.query_options = 
*query_options;
   plan_params.query_ctx.query_id.hi = 0;
   plan_params.query_ctx.query_id.lo = query_id;
   return new RuntimeState(plan_params, exec_env_.get());
 }
 
 Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int 
block_size,
-    RuntimeState** runtime_state) {
-  *runtime_state = CreateRuntimeState(query_id);
+    RuntimeState** runtime_state, TQueryOptions* query_options) {
+  *runtime_state = CreateRuntimeState(query_id, query_options);
   if (*runtime_state == NULL) {
     return Status("Unexpected error creating RuntimeState");
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/test-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index bfbe935..a3ab29a 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -37,10 +37,10 @@ class TestEnv {
   /// query states have been created.
   void InitTmpFileMgr(const std::vector<std::string>& tmp_dirs, bool 
one_dir_per_device);
 
-  /// Create a RuntimeState for a query with a new block manager. The 
RuntimeState is
-  /// owned by the TestEnv.
+  /// Create a RuntimeState for a query with a new block manager and the given 
query
+  /// options. The RuntimeState is owned by the TestEnv.
   Status CreateQueryState(int64_t query_id, int max_buffers, int block_size,
-      RuntimeState** runtime_state);
+      RuntimeState** runtime_state, TQueryOptions* query_options = NULL);
 
   /// Create multiple separate RuntimeStates with associated block managers, 
e.g. as if
   /// multiple queries were executing. The RuntimeStates are owned by TestEnv.
@@ -65,8 +65,9 @@ class TestEnv {
   /// Recreate global metric groups.
   void InitMetrics();
 
-  /// Create a new RuntimeState sharing global environment.
-  RuntimeState* CreateRuntimeState(int64_t query_id);
+  /// Create a new RuntimeState sharing global environment with given query 
options
+  RuntimeState* CreateRuntimeState(int64_t query_id,
+      TQueryOptions* query_options = NULL);
 
   /// Global state for test environment.
   static boost::scoped_ptr<MetricGroup> static_metrics_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc 
b/be/src/runtime/tmp-file-mgr-test.cc
index 9f47018..523dcff 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -24,6 +24,7 @@
 #include "common/init.h"
 #include "runtime/tmp-file-mgr.h"
 #include "service/fe-support.h"
+#include "testutil/test-macros.h"
 #include "util/filesystem-util.h"
 #include "util/metrics.h"
 
@@ -62,6 +63,12 @@ class TmpFileMgrTest : public ::testing::Test {
     }
   }
 
+  void RemoveAndCreateDirs(const vector<string>& dirs) {
+    for (const string& dir: dirs) {
+      ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(dir));
+    }
+  }
+
   scoped_ptr<MetricGroup> metrics_;
 };
 
@@ -69,15 +76,15 @@ class TmpFileMgrTest : public ::testing::Test {
 /// at the expected file offsets and expands the temporary file to the correct 
size.
 TEST_F(TmpFileMgrTest, TestFileAllocation) {
   TmpFileMgr tmp_file_mgr;
-  EXPECT_TRUE(tmp_file_mgr.Init(metrics_.get()).ok());
+  ASSERT_OK(tmp_file_mgr.Init(metrics_.get()));
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr);
   // Default configuration should give us one temporary device.
   EXPECT_EQ(1, tmp_file_mgr.num_active_tmp_devices());
   vector<TmpFileMgr::DeviceId> tmp_devices = tmp_file_mgr.active_tmp_devices();
   EXPECT_EQ(1, tmp_devices.size());
   TUniqueId id;
   TmpFileMgr::File *file;
-  Status status = tmp_file_mgr.GetFile(tmp_devices[0], id, &file);
-  EXPECT_TRUE(status.ok());
+  ASSERT_OK(file_group.NewFile(tmp_devices[0], id, &file));
   EXPECT_TRUE(file != NULL);
   // Apply writes of variable sizes and check space was allocated correctly.
   int64_t write_sizes[] = {
@@ -87,15 +94,13 @@ TEST_F(TmpFileMgrTest, TestFileAllocation) {
   int64_t next_offset = 0;
   for (int i = 0; i < num_write_sizes; ++i) {
     int64_t offset;
-    status = file->AllocateSpace(write_sizes[i], &offset);
-    EXPECT_TRUE(status.ok());
+    ASSERT_OK(file->AllocateSpace(write_sizes[i], &offset));
     EXPECT_EQ(next_offset, offset);
     next_offset = offset + write_sizes[i];
     EXPECT_EQ(next_offset, boost::filesystem::file_size(file->path()));
   }
   // Check that cleanup is correct.
-  status = file->Remove();
-  EXPECT_TRUE(status.ok());
+  file_group.Close();
   EXPECT_FALSE(boost::filesystem::exists(file->path()));
   CheckMetrics(&tmp_file_mgr);
 }
@@ -103,14 +108,11 @@ TEST_F(TmpFileMgrTest, TestFileAllocation) {
 /// Test that we can do initialization with two directories on same device and
 /// that validations prevents duplication of directories.
 TEST_F(TmpFileMgrTest, TestOneDirPerDevice) {
-  vector<string> tmp_dirs;
-  tmp_dirs.push_back("/tmp/tmp-file-mgr-test.1");
-  tmp_dirs.push_back("/tmp/tmp-file-mgr-test.2");
-  for (int i = 0; i < tmp_dirs.size(); ++i) {
-    EXPECT_TRUE(FileSystemUtil::RemoveAndCreateDirectory(tmp_dirs[i]).ok());
-  }
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", 
"/tmp/tmp-file-mgr-test.2"});
+  RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, true, metrics_.get());
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr);
 
   // Only the first directory should be used.
   EXPECT_EQ(1, tmp_file_mgr.num_active_tmp_devices());
@@ -118,23 +120,21 @@ TEST_F(TmpFileMgrTest, TestOneDirPerDevice) {
   EXPECT_EQ(1, devices.size());
   TUniqueId id;
   TmpFileMgr::File *file;
-  EXPECT_TRUE(tmp_file_mgr.GetFile(devices[0], id, &file).ok());
+  ASSERT_OK(file_group.NewFile(devices[0], id, &file));
   // Check the prefix is the expected temporary directory.
   EXPECT_EQ(0, file->path().find(tmp_dirs[0]));
   FileSystemUtil::RemovePaths(tmp_dirs);
+  file_group.Close();
   CheckMetrics(&tmp_file_mgr);
 }
 
 /// Test that we can do custom initialization with two dirs on same device.
 TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) {
-  vector<string> tmp_dirs;
-  tmp_dirs.push_back("/tmp/tmp-file-mgr-test.1");
-  tmp_dirs.push_back("/tmp/tmp-file-mgr-test.2");
-  for (int i = 0; i < tmp_dirs.size(); ++i) {
-    EXPECT_TRUE(FileSystemUtil::RemoveAndCreateDirectory(tmp_dirs[i]).ok());
-  }
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", 
"/tmp/tmp-file-mgr-test.2"});
+  RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr);
 
   // Both directories should be used.
   EXPECT_EQ(2, tmp_file_mgr.num_active_tmp_devices());
@@ -144,25 +144,23 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) {
     EXPECT_EQ(0, tmp_file_mgr.GetTmpDirPath(devices[i]).find(tmp_dirs[i]));
     TUniqueId id;
     TmpFileMgr::File *file;
-    EXPECT_TRUE(tmp_file_mgr.GetFile(devices[i], id, &file).ok());
+    ASSERT_OK(file_group.NewFile(devices[i], id, &file));
     // Check the prefix is the expected temporary directory.
     EXPECT_EQ(0, file->path().find(tmp_dirs[i]));
   }
   FileSystemUtil::RemovePaths(tmp_dirs);
+  file_group.Close();
   CheckMetrics(&tmp_file_mgr);
 }
 
 /// Test that reporting a write error is possible but does not result in
 /// blacklisting, which is disabled.
 TEST_F(TmpFileMgrTest, TestReportError) {
-  vector<string> tmp_dirs;
-  tmp_dirs.push_back("/tmp/tmp-file-mgr-test.1");
-  tmp_dirs.push_back("/tmp/tmp-file-mgr-test.2");
-  for (int i = 0; i < tmp_dirs.size(); ++i) {
-    EXPECT_TRUE(FileSystemUtil::RemoveAndCreateDirectory(tmp_dirs[i]).ok());
-  }
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", 
"/tmp/tmp-file-mgr-test.2"});
+  RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr);
 
   // Both directories should be used.
   vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.active_tmp_devices();
@@ -173,7 +171,7 @@ TEST_F(TmpFileMgrTest, TestReportError) {
   TUniqueId id;
   int good_device = 0, bad_device = 1;
   TmpFileMgr::File* bad_file;
-  EXPECT_TRUE(tmp_file_mgr.GetFile(devices[bad_device], id, &bad_file).ok());
+  ASSERT_OK(file_group.NewFile(devices[bad_device], id, &bad_file));
   ErrorMsg errmsg(TErrorCode::GENERAL, "A fake error");
   bad_file->ReportIOError(errmsg);
 
@@ -187,34 +185,35 @@ TEST_F(TmpFileMgrTest, TestReportError) {
 
   // Attempts to expand bad file should succeed.
   int64_t offset;
-  EXPECT_TRUE(bad_file->AllocateSpace(128, &offset).ok());
-  EXPECT_TRUE(bad_file->Remove().ok());
+  ASSERT_OK(bad_file->AllocateSpace(128, &offset));
   // The good device should still be usable.
   TmpFileMgr::File* good_file;
-  EXPECT_TRUE(tmp_file_mgr.GetFile(devices[good_device], id, &good_file).ok());
+  ASSERT_OK(file_group.NewFile(devices[good_device], id, &good_file));
   EXPECT_TRUE(good_file != NULL);
-  EXPECT_TRUE(good_file->AllocateSpace(128, &offset).ok());
+  ASSERT_OK(good_file->AllocateSpace(128, &offset));
   // Attempts to allocate new files on bad device should succeed.
-  EXPECT_TRUE(tmp_file_mgr.GetFile(devices[bad_device], id, &bad_file).ok());
+  ASSERT_OK(file_group.NewFile(devices[bad_device], id, &bad_file));
   FileSystemUtil::RemovePaths(tmp_dirs);
+  file_group.Close();
   CheckMetrics(&tmp_file_mgr);
 }
 
 TEST_F(TmpFileMgrTest, TestAllocateFails) {
   string tmp_dir("/tmp/tmp-file-mgr-test.1");
   string scratch_subdir = tmp_dir + "/impala-scratch";
-  vector<string> tmp_dirs(1, tmp_dir);
-  EXPECT_TRUE(FileSystemUtil::RemoveAndCreateDirectory(tmp_dir).ok());
+  vector<string> tmp_dirs({tmp_dir});
+  RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr);
 
   TUniqueId id;
   TmpFileMgr::File* allocated_file1;
   TmpFileMgr::File* allocated_file2;
   int64_t offset;
-  EXPECT_TRUE(tmp_file_mgr.GetFile(0, id, &allocated_file1).ok());
-  EXPECT_TRUE(tmp_file_mgr.GetFile(0, id, &allocated_file2).ok());
-  EXPECT_TRUE(allocated_file1->AllocateSpace(1, &offset).ok());
+  ASSERT_OK(file_group.NewFile(0, id, &allocated_file1));
+  ASSERT_OK(file_group.NewFile(0, id, &allocated_file2));
+  ASSERT_OK(allocated_file1->AllocateSpace(1, &offset));
 
   // Make scratch non-writable and test for allocation errors at different 
stages:
   // new file creation, files with no allocated blocks. files with allocated 
space.
@@ -225,10 +224,50 @@ TEST_F(TmpFileMgrTest, TestAllocateFails) {
   EXPECT_FALSE(allocated_file2->AllocateSpace(1, &offset).ok());
   // Creating a new File object can succeed because it is not immediately 
created on disk.
   TmpFileMgr::File* unallocated_file;
-  EXPECT_TRUE(tmp_file_mgr.GetFile(0, id, &unallocated_file).ok());
+  ASSERT_OK(file_group.NewFile(0, id, &unallocated_file));
 
   chmod(scratch_subdir.c_str(), S_IRWXU);
   FileSystemUtil::RemovePaths(tmp_dirs);
+  file_group.Close();
+}
+
+// Test scratch limit is applied correctly to group of files.
+TEST_F(TmpFileMgrTest, TestScratchLimit) {
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", 
"/tmp/tmp-file-mgr-test.2"});
+  RemoveAndCreateDirs(tmp_dirs);
+  TmpFileMgr tmp_file_mgr;
+  tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
+
+  const int64_t LIMIT = 100;
+  const int64_t FILE1_ALLOC = 25;
+  const int64_t FILE2_ALLOC = LIMIT - FILE1_ALLOC;
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, LIMIT);
+  TmpFileMgr::File* file1;
+  TmpFileMgr::File* file2;
+  TUniqueId id;
+  ASSERT_OK(file_group.NewFile(0, id, &file1));
+  ASSERT_OK(file_group.NewFile(1, id, &file2));
+
+  // Test individual limit is enforced.
+  Status status;
+  int64_t offset;
+  status = file1->AllocateSpace(LIMIT + 1, &offset);
+  ASSERT_FALSE(status.ok());
+  ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
+  ASSERT_OK(file1->AllocateSpace(FILE1_ALLOC, &offset));
+  ASSERT_EQ(0, offset);
+
+  // Test aggregate limit is enforced.
+  status = file2->AllocateSpace(FILE2_ALLOC + 1, &offset);
+  ASSERT_FALSE(status.ok());
+  ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
+  ASSERT_OK(file2->AllocateSpace(FILE2_ALLOC, &offset));
+  ASSERT_EQ(0, offset);
+  status = file2->AllocateSpace(1, &offset);
+  ASSERT_FALSE(status.ok());
+  ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
+
+  file_group.Close();
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 73596ab..b616d18 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -136,11 +136,12 @@ Status TmpFileMgr::InitCustom(const vector<string>& 
tmp_dirs, bool one_dir_per_d
   return Status::OK();
 }
 
-Status TmpFileMgr::GetFile(const DeviceId& device_id, const TUniqueId& 
query_id,
-    File** new_file) {
+Status TmpFileMgr::NewFile(FileGroup* file_group, const DeviceId& device_id,
+    const TUniqueId& query_id, File** new_file) {
   DCHECK(initialized_);
   DCHECK_GE(device_id, 0);
   DCHECK_LT(device_id, tmp_dirs_.size());
+  DCHECK(file_group != NULL);
   if (IsBlacklisted(device_id)) {
     return Status(TErrorCode::TMP_DEVICE_BLACKLISTED, 
tmp_dirs_[device_id].path());
   }
@@ -152,7 +153,7 @@ Status TmpFileMgr::GetFile(const DeviceId& device_id, const 
TUniqueId& query_id,
   path new_file_path(tmp_dirs_[device_id].path());
   new_file_path /= file_name.str();
 
-  *new_file = new File(this, device_id, new_file_path.string());
+  *new_file = new File(this, file_group, device_id, new_file_path.string());
   return Status::OK();
 }
 
@@ -209,16 +210,24 @@ vector<TmpFileMgr::DeviceId> 
TmpFileMgr::active_tmp_devices() {
   return devices;
 }
 
-TmpFileMgr::File::File(TmpFileMgr* mgr, DeviceId device_id, const string& path)
+TmpFileMgr::File::File(TmpFileMgr* mgr, FileGroup* file_group, DeviceId 
device_id,
+    const string& path)
   : mgr_(mgr),
+    file_group_(file_group),
     path_(path),
     device_id_(device_id),
     current_size_(0),
     blacklisted_(false) {
+  DCHECK(file_group != NULL);
 }
 
 Status TmpFileMgr::File::AllocateSpace(int64_t write_size, int64_t* offset) {
   DCHECK_GT(write_size, 0);
+  if (file_group_->bytes_limit_ != -1 &&
+      file_group_->current_bytes_allocated_ + write_size
+      > file_group_->bytes_limit_) {
+    return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, 
file_group_->bytes_limit_);
+  }
   Status status;
   if (mgr_->IsBlacklisted(device_id_)) {
     blacklisted_ = true;
@@ -240,6 +249,7 @@ Status TmpFileMgr::File::AllocateSpace(int64_t write_size, 
int64_t* offset) {
     return status;
   }
   *offset = current_size_;
+  file_group_->current_bytes_allocated_ += write_size;
   current_size_ = new_size;
   return Status::OK();
 }
@@ -256,4 +266,31 @@ Status TmpFileMgr::File::Remove() {
   return Status::OK();
 }
 
+TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, int64_t bytes_limit)
+  : tmp_file_mgr_(tmp_file_mgr),
+    current_bytes_allocated_(0),
+    bytes_limit_(bytes_limit) {
+  DCHECK(tmp_file_mgr != NULL);
+}
+
+Status TmpFileMgr::FileGroup::NewFile(const DeviceId& device_id,
+    const TUniqueId& query_id, File** new_file) {
+  TmpFileMgr::File* tmp_file;
+  RETURN_IF_ERROR(tmp_file_mgr_->NewFile(this, device_id, query_id, 
&tmp_file));
+  tmp_files_.emplace_back(tmp_file);
+  if (new_file != NULL) *new_file = tmp_file;
+  return Status::OK();
+}
+
+void TmpFileMgr::FileGroup::Close() {
+  for (std::unique_ptr<TmpFileMgr::File>& file: tmp_files_) {
+    Status status = file->Remove();
+    if (!status.ok()) {
+      LOG(WARNING) << "Error removing scratch file '" << file->path() << "': "
+                   << status.msg().msg();
+    }
+  }
+  tmp_files_.clear();
+}
+
 } //namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index d085f2c..bd6c366 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -28,12 +28,20 @@ namespace impala {
 /// TmpFileMgr creates and manages temporary files and directories on the local
 /// filesystem. It can manage multiple temporary directories across multiple 
devices.
 /// TmpFileMgr ensures that at most one directory per device is used unless 
overridden
-/// for testing. GetFile() returns a File handle with a unique filename on a 
device. The
-/// client owns the File handle and can use it to expand the file.
+/// for testing.
+///
+/// Every temporary File belongs to a FileGroup: to allocate temporary files, 
first a
+/// FileGroup is created, then FileGroup::NewFile() is called to create a new 
File with
+/// a unique filename on the specified temporary device. The client can use 
the File
+/// handle to allocate space in the file. FileGroups can be created with a 
limit on
+/// the total number of bytes allocated across all files in the group.
+///
 /// TODO: we could notify block managers about the failure so they can more 
take
 /// proactive action to avoid using the device.
 class TmpFileMgr {
  public:
+  class FileGroup;
+
   /// DeviceId is a unique identifier for a temporary device managed by 
TmpFileMgr.
   /// It is used as a handle for external classes to identify devices.
   typedef int DeviceId;
@@ -43,28 +51,31 @@ class TmpFileMgr {
   /// Creation of the file is deferred until the first call to AllocateSpace().
   class File {
    public:
-    /// Allocates 'write_size' bytes in this file for a new block of data.
+    /// Allocates 'write_size' bytes in this file for a new block of data only 
if it
+    /// does not cross the allocation limit of its associated FileGroup.
     /// The file size is increased by a call to truncate() if necessary.
     /// The physical file is created on the first call to AllocateSpace().
     /// Returns Status::OK() and sets offset on success.
-    /// Returns an error status if an unexpected error occurs.
+    /// Returns an error status if an unexpected error occurs or if allowing 
the
+    /// allocation would exceed the allocation limit of its associated 
FileGroup.
     /// If an error status is returned, the caller can try a different 
temporary file.
     Status AllocateSpace(int64_t write_size, int64_t* offset);
 
     /// Called to notify TmpFileMgr that an IO error was encountered for this 
file
     void ReportIOError(const ErrorMsg& msg);
 
-    /// Delete the physical file on disk, if one was created.
-    /// It is not valid to read or write to a file after calling Remove().
-    Status Remove();
-
     const std::string& path() const { return path_; }
     int disk_id() const { return disk_id_; }
     bool is_blacklisted() const { return blacklisted_; }
 
    private:
+    friend class FileGroup;
     friend class TmpFileMgr;
 
+    /// Delete the physical file on disk, if one was created.
+    /// It is not valid to read or write to a file after calling Remove().
+    Status Remove();
+
     /// The name of the sub-directory that Impala created within each 
configured scratch
     /// directory.
     const static std::string TMP_SUB_DIR_NAME;
@@ -73,11 +84,15 @@ class TmpFileMgr {
     /// directory. A warning is issued if available space is less than this 
threshold.
     const static uint64_t AVAILABLE_SPACE_THRESHOLD_MB;
 
-    File(TmpFileMgr* mgr, DeviceId device_id, const std::string& path);
+    File(TmpFileMgr* mgr, FileGroup* file_group, DeviceId device_id,
+        const std::string& path);
 
     /// TmpFileMgr this belongs to.
     TmpFileMgr* mgr_;
 
+    /// The FileGroup this belongs to. Cannot be null.
+    FileGroup* file_group_;
+
     /// Path of the physical file in the filesystem.
     std::string path_;
 
@@ -97,6 +112,56 @@ class TmpFileMgr {
     bool blacklisted_;
   };
 
+  /// Represents a group of files. The total allocated bytes of the group can 
be bound by
+  /// setting the space allocation limit. The owner of the FileGroup object is
+  /// responsible for calling the Close method to delete all the files in the 
group.
+  class FileGroup {
+  public:
+    FileGroup(TmpFileMgr* tmp_file_mgr, int64_t bytes_limit = -1);
+
+    ~FileGroup(){
+      DCHECK_EQ(NumFiles(), 0);
+    }
+
+    /// Creates a new File with a unique path for a query instance, adds it to 
the
+    /// group and returns a handle for that file. The file path is within the 
(single)
+    /// tmp directory on the specified device id.
+    /// If an error is encountered, e.g. the device is blacklisted, the file 
is not
+    /// added to this group and a non-ok status is returned.
+    Status NewFile(const DeviceId& device_id, const TUniqueId& query_id,
+        File** new_file = NULL);
+
+    /// Returns a file handle at the specified index in the group.
+    File* GetFileAt(int index) {
+      DCHECK_GE(index, 0);
+      DCHECK_LT(index, NumFiles());
+      return tmp_files_[index].get();
+    }
+
+    /// Calls Remove() on all the files in the group and deletes them.
+    void Close();
+
+    /// Returns the number of files that are a part of the group.
+    int NumFiles() {
+      return tmp_files_.size();
+    }
+
+  private:
+    friend class File;
+
+    /// The TmpFileMgr it is associated with.
+    TmpFileMgr* tmp_file_mgr_;
+
+    /// List of files representing the FileGroup.
+    std::vector<std::unique_ptr<File>> tmp_files_;
+
+    /// Total space allocated in this group's files.
+    int64_t current_bytes_allocated_;
+
+    /// Max write space allowed (-1 means no limit).
+    int64_t bytes_limit_;
+  };
+
   TmpFileMgr();
 
   /// Creates the configured tmp directories. If multiple directories are 
specified per
@@ -109,13 +174,6 @@ class TmpFileMgr {
   Status InitCustom(const std::vector<std::string>& tmp_dirs, bool 
one_dir_per_device,
       MetricGroup* metrics);
 
-  /// Return a new File handle with a unique path for a query instance. The 
file path
-  /// is within the (single) tmp directory on the specified device id. The 
caller owns
-  /// the returned handle and is responsible for deleting it. The file is not 
created -
-  /// creation is deferred until the first call to File::AllocateSpace().
-  Status GetFile(const DeviceId& device_id, const TUniqueId& query_id,
-      File** new_file);
-
   /// Return the scratch directory path for the device.
   std::string GetTmpDirPath(DeviceId device_id) const;
 
@@ -128,6 +186,14 @@ class TmpFileMgr {
   std::vector<DeviceId> active_tmp_devices();
 
  private:
+  /// Return a new File handle with a unique path for a query instance. The 
file is
+  /// associated with the file_group and the file path is within the (single) 
tmp
+  /// directory on the specified device id. The caller owns the returned 
handle and is
+  /// responsible for deleting it. The file is not created - creation is 
deferred until
+  /// the first call to File::AllocateSpace().
+  Status NewFile(FileGroup* file_group, const DeviceId& device_id,
+      const TUniqueId& query_id, File** new_file);
+
   /// Dir stores information about a temporary directory.
   class Dir {
    public:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index ec1e632..d0e4275 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -428,6 +428,18 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
             iequals(value, "true") || iequals(value, "1"));
         break;
       }
+      case TImpalaQueryOptions::SCRATCH_LIMIT: {
+        // Parse the scratch limit spec and validate it.
+        if (iequals(value, "-1")) {
+          query_options->__set_scratch_limit(-1);
+        } else {
+          int64_t bytes_limit;
+          RETURN_IF_ERROR(ParseMemValue(value, "Scratch space memory limit",
+              &bytes_limit));
+          query_options->__set_scratch_limit(bytes_limit);
+        }
+        break;
+      }
       default:
         // We hit this DCHECK(false) if we forgot to add the corresponding 
entry here
         // when we add a new query option.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 4a3b199..2c25700 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -35,7 +35,7 @@ class TQueryOptions;
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::STRICT_MODE + 1);\
+      TImpalaQueryOptions::SCRATCH_LIMIT + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -84,7 +84,8 @@ class TQueryOptions;
   QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE)\
   QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE)\
   QUERY_OPT_FN(prefetch_mode, PREFETCH_MODE)\
-  QUERY_OPT_FN(strict_mode, STRICT_MODE);
+  QUERY_OPT_FN(strict_mode, STRICT_MODE)\
+  QUERY_OPT_FN(scratch_limit, SCRATCH_LIMIT);
 
 
 /// Converts a TQueryOptions struct into a map of key, value pairs.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index 003a618..fdb8aa8 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -203,6 +203,9 @@ struct TQueryOptions {
 
   // Additional strict handling of invalid data parsing and type conversions.
   49: optional bool strict_mode = false
+
+  // A limit on the amount of scratch directory space that can be used;
+  50: optional i64 scratch_limit = -1
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index 25dfddf..ae00ea2 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -233,7 +233,12 @@ enum TImpalaQueryOptions {
   PREFETCH_MODE,
 
   // Additional strict handling of invalid data parsing and type conversions.
-  STRICT_MODE
+  STRICT_MODE,
+
+  // A limit on the amount of scratch directory space that can be used;
+  // Unspecified or a limit of -1 means no limit;
+  // Otherwise specified in the same way as MEM_LIMIT.
+  SCRATCH_LIMIT
 }
 
 // The summary of an insert.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py 
b/common/thrift/generate_error_codes.py
index 61b478b..3d48005 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -280,7 +280,10 @@ error_codes = (
    "data of type $1: $2"),
 
   ("TEXT_PARSER_TRUNCATED_COLUMN", 90, "Length of column is $0 which exceeds 
maximum "
-   "supported length of 2147483647 bytes.")
+   "supported length of 2147483647 bytes."),
+
+  ("SCRATCH_LIMIT_EXCEEDED", 91, "Scratch space limit of $0 bytes exceeded for 
query "
+   "while spilling data to disk.")
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/tests/query_test/test_scratch_limit.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scratch_limit.py 
b/tests/query_test/test_scratch_limit.py
new file mode 100644
index 0000000..f2cec78
--- /dev/null
+++ b/tests/query_test/test_scratch_limit.py
@@ -0,0 +1,105 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_dimensions import create_single_exec_option_dimension
+from tests.common.test_dimensions import create_uncompressed_text_dimension
+
+class TestScratchLimit(ImpalaTestSuite):
+  """
+  This class tests the functionality of setting the scratch limit as a query 
option
+  """
+
+  spill_query = """
+      select o_orderdate, o_custkey, o_comment
+      from tpch.orders
+      order by o_orderdate
+      """
+
+  # Block manager memory limit that is low enough to
+  # force Impala to spill to disk when executing 'spill_query'
+  max_block_mgr_memory = "64m"
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestScratchLimit, cls).add_test_dimensions()
+    # There is no reason to run these tests using all dimensions.
+    cls.TestMatrix.add_dimension(create_single_exec_option_dimension())
+    cls.TestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+
+  def test_with_high_scratch_limit(self, vector):
+    """
+    Query runs to completion with a scratch limit well above
+    its required scratch space which in this case is 128m.
+    """
+    exec_option = vector.get_value('exec_option')
+    exec_option['max_block_mgr_memory'] = self.max_block_mgr_memory
+    exec_option['scratch_limit'] = '500m'
+    self.execute_query_expect_success(self.client, self.spill_query, 
exec_option)
+
+  def test_with_low_scratch_limit(self, vector):
+    """
+    Query throws the appropriate exception with a scratch limit well below
+    its required scratch space which in this case is 128m.
+    """
+    exec_option = vector.get_value('exec_option')
+    exec_option['max_block_mgr_memory'] = self.max_block_mgr_memory
+    exec_option['scratch_limit'] = '50m'
+    expected_error = 'Scratch space limit of %s bytes exceeded'
+    scratch_limit_in_bytes = 50 * 1024 * 1024
+    try:
+      self.execute_query(self.spill_query, exec_option)
+      assert False, "Query was expected to fail"
+    except ImpalaBeeswaxException as e:
+      assert expected_error % scratch_limit_in_bytes in str(e)
+
+  def test_with_zero_scratch_limit(self, vector):
+    """
+    Query throws the appropriate exception with a scratch limit of
+    zero which means no scratch space can be allocated.
+    """
+    exec_option = vector.get_value('exec_option')
+    exec_option['max_block_mgr_memory'] = self.max_block_mgr_memory
+    exec_option['scratch_limit'] = '0'
+    self.execute_query_expect_failure(self.spill_query, exec_option)
+
+  def test_with_unlimited_scratch_limit(self, vector):
+    """
+    Query runs to completion with a scratch Limit of -1 means default/no limit.
+    """
+    exec_option = vector.get_value('exec_option')
+    exec_option['max_block_mgr_memory'] = self.max_block_mgr_memory
+    exec_option['scratch_limit'] = '-1'
+    self.execute_query_expect_success(self.client, self.spill_query, 
exec_option)
+
+  def test_without_specifying_scratch_limit(self, vector):
+    """
+    Query runs to completion with the default setting of no scratch limit.
+    """
+    exec_option = vector.get_value('exec_option')
+    exec_option['max_block_mgr_memory'] = self.max_block_mgr_memory
+    self.execute_query_expect_success(self.client, self.spill_query, 
exec_option)
+
+  def test_with_zero_scratch_limit_no_memory_limit(self, vector):
+    """
+    Query runs to completion without spilling as there is no limit on block 
memory manger.
+    Scratch limit of zero ensures spilling is disabled.
+    """
+    exec_option = vector.get_value('exec_option')
+    exec_option['scratch_limit'] = '0'
+    self.execute_query_expect_success(self.client, self.spill_query, 
exec_option)


Reply via email to