IMPALA-4160: Remove Llama support.

Alas, poor Llama! I knew him, Impala: a system
of infinite jest, of most excellent fancy: we hath
borne him on our back a thousand times; and now, how
abhorred in my imagination it is!

Done:

* Removed QueryResourceMgr, ResourceBroker, CGroupsMgr
* Removed untested 'offline' mode and NM failure detection from
  ImpalaServer
* Removed all Llama-related Thrift files
* Removed RM-related arguments to MemTracker constructors
* Deprecated all RM-related flags, printing a warning if enable_rm is
  set
* Removed expansion logic from MemTracker
* Removed VCore logic from QuerySchedule
* Removed all reservation-related logic from Scheduler
* Removed RM metric descriptions
* Various misc. small class changes

Not done:

* Remove RM flags (--enable_rm etc.)
* Remove RM query options
* Changes to RequestPoolService (see IMPALA-4159)
* Remove estimates of VCores / memory from plan

Change-Id: Icfb14209e31f6608bb7b8a33789e00411a6447ef
Reviewed-on: http://gerrit.cloudera.org:8080/4445
Tested-by: Internal Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/19de09ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/19de09ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/19de09ab

Branch: refs/heads/master
Commit: 19de09ab7db4498fa3dd6e0775e32581139dd336
Parents: 3be61f9
Author: Henry Robinson <he...@cloudera.com>
Authored: Thu Sep 15 18:09:46 2016 -0700
Committer: Henry Robinson <he...@cloudera.com>
Committed: Tue Sep 20 23:50:43 2016 +0000

----------------------------------------------------------------------
 be/CMakeLists.txt                               |   4 -
 be/generated-sources/gen-cpp/CMakeLists.txt     |   8 -
 be/src/bufferpool/reservation-tracker-test.cc   |   6 +-
 be/src/exec/blocking-join-node.cc               |   8 -
 be/src/exec/data-sink.cc                        |   2 +-
 be/src/exec/exec-node.cc                        |   5 +-
 be/src/exec/hash-join-node.cc                   |   1 -
 be/src/exec/hdfs-scan-node-base.cc              |   1 -
 be/src/exec/hdfs-scan-node.cc                   |  37 -
 be/src/exec/kudu-scan-node-test.cc              |   2 +-
 be/src/exec/kudu-scan-node.cc                   |   5 -
 be/src/exec/kudu-table-sink-test.cc             |   3 +-
 be/src/exprs/expr-test.cc                       |   4 +-
 be/src/resourcebroker/CMakeLists.txt            |  31 -
 be/src/resourcebroker/resource-broker.cc        | 850 -------------------
 be/src/resourcebroker/resource-broker.h         | 424 ---------
 be/src/runtime/buffered-block-mgr-test.cc       |   4 +-
 be/src/runtime/buffered-block-mgr.cc            |   4 +-
 be/src/runtime/collection-value-builder-test.cc |   2 +-
 be/src/runtime/coordinator.cc                   |  23 +-
 be/src/runtime/data-stream-recvr.cc             |   2 +-
 be/src/runtime/data-stream-test.cc              |   8 +-
 be/src/runtime/disk-io-mgr-test.cc              |   4 +-
 be/src/runtime/disk-io-mgr.cc                   |   4 +-
 be/src/runtime/exec-env.cc                      | 196 +----
 be/src/runtime/exec-env.h                       |  19 -
 be/src/runtime/mem-pool-test.cc                 |   4 +-
 be/src/runtime/mem-tracker-test.cc              |   6 +-
 be/src/runtime/mem-tracker.cc                   |  77 +-
 be/src/runtime/mem-tracker.h                    |  81 +-
 be/src/runtime/plan-fragment-executor.cc        |  90 +-
 be/src/runtime/runtime-filter-bank.cc           |   5 +-
 be/src/runtime/runtime-state.cc                 |  25 +-
 be/src/runtime/runtime-state.h                  |  16 +-
 be/src/runtime/test-env.cc                      |   2 +-
 be/src/scheduling/CMakeLists.txt                |   1 -
 be/src/scheduling/query-resource-mgr.cc         | 271 ------
 be/src/scheduling/query-resource-mgr.h          | 227 -----
 be/src/scheduling/query-schedule.cc             | 137 +--
 be/src/scheduling/query-schedule.h              |  38 +-
 be/src/scheduling/request-pool-service.cc       |   9 +-
 be/src/scheduling/scheduler.h                   |  16 -
 be/src/scheduling/simple-scheduler-test.cc      |   4 +-
 be/src/scheduling/simple-scheduler.cc           | 171 +---
 be/src/scheduling/simple-scheduler.h            |  41 +-
 be/src/service/impala-server.cc                 |  92 +-
 be/src/service/impala-server.h                  |  24 -
 be/src/service/impalad-main.cc                  |   8 +
 be/src/service/query-exec-state.cc              |  20 -
 be/src/util/CMakeLists.txt                      |   2 -
 be/src/util/cgroups-mgr.cc                      | 238 ------
 be/src/util/cgroups-mgr.h                       | 175 ----
 be/src/util/debug-util.h                        |   1 -
 be/src/util/llama-util.cc                       | 152 ----
 be/src/util/llama-util.h                        |  75 --
 be/src/util/thread-pool.h                       |   4 -
 be/src/util/thread.cc                           |  14 -
 be/src/util/thread.h                            |  21 -
 be/src/util/uid-util.h                          |   5 +-
 bin/bootstrap_toolchain.py                      |   2 +-
 bin/create-test-configuration.sh                |   2 +-
 bin/generate_minidump_collection_testdata.py    |   1 -
 bin/start-impala-cluster.py                     |  23 +-
 common/thrift/CMakeLists.txt                    |   2 -
 common/thrift/Frontend.thrift                   |   1 +
 common/thrift/ImpalaInternalService.thrift      |   7 -
 common/thrift/Llama.thrift                      | 276 ------
 common/thrift/ResourceBrokerService.thrift      | 119 ---
 common/thrift/metrics.json                      | 210 -----
 .../com/cloudera/impala/planner/Planner.java    |   1 +
 testdata/cluster/admin                          |  12 +-
 .../cdh5/etc/hadoop/conf/yarn-site.xml.tmpl     |  13 +-
 .../cdh5/etc/init.d/llama-application           |  38 -
 .../etc/llama/conf/llama-log4j.properties.tmpl  |  29 -
 .../cdh5/etc/llama/conf/llama-site.xml.tmpl     |  86 --
 .../common/etc/hadoop/conf/core-site.xml.tmpl   |  10 -
 76 files changed, 165 insertions(+), 4376 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 6df394f..2546bd0 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -167,7 +167,6 @@ if (DOXYGEN_FOUND)
     ${CMAKE_SOURCE_DIR}/be/src/common/
     ${CMAKE_SOURCE_DIR}/be/src/exec/
     ${CMAKE_SOURCE_DIR}/be/src/exprs/
-    ${CMAKE_SOURCE_DIR}/be/src/resourcebroker/
     ${CMAKE_SOURCE_DIR}/be/src/runtime/
     ${CMAKE_SOURCE_DIR}/be/src/scheduling/
     ${CMAKE_SOURCE_DIR}/be/src/service/
@@ -267,7 +266,6 @@ set (IMPALA_LINK_LIBS
   Exprs
   GlobalFlags
   ImpalaThrift
-  ResourceBroker
   Rpc
   Runtime
   Scheduling
@@ -295,7 +293,6 @@ if (BUILD_SHARED_LIBS)
     Statestore
     Scheduling
     Catalog
-    ResourceBroker
     ImpalaThrift
     GlobalFlags
     Common
@@ -423,7 +420,6 @@ add_subdirectory(src/catalog)
 add_subdirectory(src/codegen)
 add_subdirectory(src/common)
 add_subdirectory(src/exec)
-add_subdirectory(src/resourcebroker)
 add_subdirectory(src/exprs)
 add_subdirectory(src/runtime)
 add_subdirectory(src/scheduling)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/generated-sources/gen-cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/generated-sources/gen-cpp/CMakeLists.txt 
b/be/generated-sources/gen-cpp/CMakeLists.txt
index ac2907e..35f7d65 100644
--- a/be/generated-sources/gen-cpp/CMakeLists.txt
+++ b/be/generated-sources/gen-cpp/CMakeLists.txt
@@ -37,10 +37,6 @@ set(SRC_FILES
   ImpalaService_constants.cpp
   ImpalaService_types.cpp
   ImpalaHiveServer2Service.cpp
-  Llama_constants.cpp
-  Llama_types.cpp
-  LlamaAMService.cpp
-  LlamaNotificationService.cpp
   beeswax_constants.cpp
   beeswax_types.cpp
   BeeswaxService.cpp
@@ -79,10 +75,6 @@ set(SRC_FILES
   NetworkTestService.cpp
   PlanNodes_constants.cpp
   PlanNodes_types.cpp
-  ResourceBrokerNotificationService.cpp
-  ResourceBrokerService_constants.cpp
-  ResourceBrokerService_types.cpp
-  ResourceBrokerService.cpp
   Results_constants.cpp
   Results_types.cpp
   Partitions_constants.cpp

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/reservation-tracker-test.cc 
b/be/src/bufferpool/reservation-tracker-test.cc
index e43efb8..8dd1e41 100644
--- a/be/src/bufferpool/reservation-tracker-test.cc
+++ b/be/src/bufferpool/reservation-tracker-test.cc
@@ -235,8 +235,8 @@ TEST_F(ReservationTrackerTest, 
MemTrackerIntegrationTwoLevel) {
   // of different code paths.
   root_.InitRootTracker(NewProfile(), MIN_BUFFER_LEN * 100);
   MemTracker root_mem_tracker;
-  MemTracker child_mem_tracker1(-1, -1, "Child 1", &root_mem_tracker);
-  MemTracker child_mem_tracker2(MIN_BUFFER_LEN * 50, -1, "Child 2", 
&root_mem_tracker);
+  MemTracker child_mem_tracker1(-1, "Child 1", &root_mem_tracker);
+  MemTracker child_mem_tracker2(MIN_BUFFER_LEN * 50, "Child 2", 
&root_mem_tracker);
   ReservationTracker child_reservations1, child_reservations2;
   child_reservations1.InitChildTracker(
       NewProfile(), &root_, &child_mem_tracker1, 500 * MIN_BUFFER_LEN);
@@ -317,7 +317,7 @@ TEST_F(ReservationTrackerTest, 
MemTrackerIntegrationMultiLevel) {
   reservations[0].InitRootTracker(NewProfile(), 500);
   for (int i = 1; i < HIERARCHY_DEPTH; ++i) {
     mem_trackers[i].reset(new MemTracker(
-        mem_limits[i], -1, Substitute("Child $0", i), mem_trackers[i - 
1].get()));
+        mem_limits[i], Substitute("Child $0", i), mem_trackers[i - 1].get()));
     reservations[i].InitChildTracker(
         NewProfile(), &reservations[i - 1], mem_trackers[i].get(), 500);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc 
b/be/src/exec/blocking-join-node.cc
index 2c17d13..309bde4 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -25,7 +25,6 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tuple-row.h"
-#include "util/cgroups-mgr.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
 #include "util/time.h"
@@ -196,13 +195,6 @@ Status 
BlockingJoinNode::ConstructBuildAndOpenProbe(RuntimeState* state,
     Thread build_thread(
         node_name_, "build thread", 
bind(&BlockingJoinNode::ProcessBuildInputAsync, this,
                                         state, build_sink, 
&build_side_status));
-    if (!state->cgroup().empty()) {
-      Status status = state->exec_env()->cgroups_mgr()->AssignThreadToCgroup(
-          build_thread, state->cgroup());
-      // If AssignThreadToCgroup() failed, we still need to wait for the 
build-side
-      // thread to complete before returning, so just log that error.
-      if (!status.ok()) state->LogError(status.msg());
-    }
     // Open the left child so that it may perform any initialisation in 
parallel.
     // Don't exit even if we see an error, we still need to wait for the build 
thread
     // to finish.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index a6c8fcd..8c8b2dc 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -153,7 +153,7 @@ Status DataSink::Prepare(RuntimeState* state, MemTracker* 
mem_tracker) {
   DCHECK(mem_tracker != NULL);
   profile_ = state->obj_pool()->Add(new RuntimeProfile(state->obj_pool(), 
GetName()));
   mem_tracker_ = mem_tracker;
-  expr_mem_tracker_.reset(new MemTracker(-1, -1, "Exprs", mem_tracker, false));
+  expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker, false));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 59ac997..2dce0d3 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -147,10 +147,9 @@ Status ExecNode::Prepare(RuntimeState* state) {
   DCHECK(runtime_profile_.get() != NULL);
   rows_returned_counter_ =
       ADD_COUNTER(runtime_profile_, "RowsReturned", TUnit::UNIT);
-  mem_tracker_.reset(new MemTracker(
-      runtime_profile_.get(), -1, -1, runtime_profile_->name(),
+  mem_tracker_.reset(new MemTracker(runtime_profile_.get(), -1, 
runtime_profile_->name(),
       state->instance_mem_tracker()));
-  expr_mem_tracker_.reset(new MemTracker(-1, -1, "Exprs", mem_tracker_.get(), 
false));
+  expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), 
false));
 
   rows_returned_rate_ = runtime_profile()->AddDerivedCounter(
       ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc
index 946ab48..52ba1d1 100644
--- a/be/src/exec/hash-join-node.cc
+++ b/be/src/exec/hash-join-node.cc
@@ -39,7 +39,6 @@
 
 #include "common/names.h"
 
-DECLARE_string(cgroup_hierarchy_path);
 DEFINE_bool(enable_probe_side_filtering, true, "Deprecated.");
 
 using namespace impala;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index de1dad0..c03817b 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -43,7 +43,6 @@
 #include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
 #include "runtime/string-buffer.h"
-#include "scheduling/query-resource-mgr.h"
 #include "util/bit-util.h"
 #include "util/container-util.h"
 #include "util/debug-util.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 83f6452..03f81ed 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -27,7 +27,6 @@
 #include "runtime/runtime-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
-#include "scheduling/query-resource-mgr.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
 #include "util/runtime-profile-counters.h"
@@ -154,12 +153,6 @@ Status HdfsScanNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
 
-  // Assign scanner thread group to cgroup, if any.
-  if (!state->cgroup().empty()) {
-    scanner_threads_.SetCgroupsMgr(state->exec_env()->cgroups_mgr());
-    scanner_threads_.SetCgroup(state->cgroup());
-  }
-
   // Compute the minimum bytes required to start a new thread. This is based 
on the
   // file format.
   // The higher the estimate, the less likely it is the query will fail but 
more likely
@@ -212,12 +205,6 @@ Status HdfsScanNode::Open(RuntimeState* state) {
   thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb(
       bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
 
-  if (runtime_state_->query_resource_mgr() != NULL) {
-    rm_callback_id_ = 
runtime_state_->query_resource_mgr()->AddVcoreAvailableCb(
-        bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this,
-            runtime_state_->resource_pool()));
-  }
-
   return Status::OK();
 }
 
@@ -228,9 +215,6 @@ void HdfsScanNode::Close(RuntimeState* state) {
   if (thread_avail_cb_id_ != -1) {
     state->resource_pool()->RemoveThreadAvailableCb(thread_avail_cb_id_);
   }
-  if (state->query_resource_mgr() != NULL && rm_callback_id_ != -1) {
-    state->query_resource_mgr()->RemoveVcoreAvailableCb(rm_callback_id_);
-  }
 
   scanner_threads_.JoinAll();
 
@@ -326,8 +310,6 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
   //  6. Don't start up a thread if there isn't enough memory left to run it.
   //  7. Don't start up more than maximum number of scanner threads configured.
   //  8. Don't start up if there are no thread tokens.
-  //  9. Don't start up if we are running too many threads for our vcore 
allocation
-  //  (unless the thread is reserved, in which case it has to run).
 
   // Case 4. We have not issued the initial ranges so don't start a scanner 
thread.
   // Issuing ranges will call this function and we'll start the scanner 
threads then.
@@ -360,25 +342,12 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
       break;
     }
 
-    // Case 9.
-    if (!is_reserved) {
-      if (runtime_state_->query_resource_mgr() != NULL &&
-          runtime_state_->query_resource_mgr()->IsVcoreOverSubscribed()) {
-        pool->ReleaseThreadToken(false);
-        break;
-      }
-    }
-
     COUNTER_ADD(&active_scanner_thread_counter_, 1);
     COUNTER_ADD(num_scanner_threads_started_counter_, 1);
     stringstream ss;
     ss << "scanner-thread(" << num_scanner_threads_started_counter_->value() 
<< ")";
     scanner_threads_.AddThread(
         new Thread("hdfs-scan-node", ss.str(), &HdfsScanNode::ScannerThread, 
this));
-
-    if (runtime_state_->query_resource_mgr() != NULL) {
-      runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(1);
-    }
   }
 }
 
@@ -411,9 +380,6 @@ void HdfsScanNode::ScannerThread() {
           // Unlock before releasing the thread token to avoid deadlock in
           // ThreadTokenAvailableCb().
           l.unlock();
-          if (runtime_state_->query_resource_mgr() != NULL) {
-            runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(-1);
-          }
           runtime_state_->resource_pool()->ReleaseThreadToken(false);
           if (filter_status.ok()) {
             for (auto& ctx: filter_ctxs) {
@@ -495,9 +461,6 @@ void HdfsScanNode::ScannerThread() {
   }
 
   COUNTER_ADD(&active_scanner_thread_counter_, -1);
-  if (runtime_state_->query_resource_mgr() != NULL) {
-    runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(-1);
-  }
   runtime_state_->resource_pool()->ReleaseThreadToken(false);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/kudu-scan-node-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-test.cc 
b/be/src/exec/kudu-scan-node-test.cc
index 8324628..a0eabef 100644
--- a/be/src/exec/kudu-scan-node-test.cc
+++ b/be/src/exec/kudu-scan-node-test.cc
@@ -85,7 +85,7 @@ class KuduScanNodeTest : public testing::Test {
       exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());
     }
 
-    runtime_state_.reset(new RuntimeState(TExecPlanFragmentParams(), "", 
exec_env_.get()));
+    runtime_state_.reset(new RuntimeState(TExecPlanFragmentParams(), 
exec_env_.get()));
     runtime_state_->InitMemTrackers(TUniqueId(), NULL, -1);
 
     TKuduScanNode kudu_scan_node_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index e171afa..827631a 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -35,7 +35,6 @@
 #include "runtime/row-batch.h"
 #include "runtime/string-value.h"
 #include "runtime/tuple-row.h"
-#include "scheduling/query-resource-mgr.h"
 #include "util/disk-info.h"
 #include "util/jni-util.h"
 #include "util/periodic-counter-updater.h"
@@ -248,10 +247,6 @@ void 
KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
     VLOG_RPC << "Thread started: " << name;
     scanner_threads_.AddThread(new Thread("kudu-scan-node", name,
         &KuduScanNode::RunScannerThread, this, name, token));
-
-    if (runtime_state_->query_resource_mgr() != NULL) {
-      runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(1);
-    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/kudu-table-sink-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink-test.cc 
b/be/src/exec/kudu-table-sink-test.cc
index 96aec55..a1cbb68 100644
--- a/be/src/exec/kudu-table-sink-test.cc
+++ b/be/src/exec/kudu-table-sink-test.cc
@@ -49,8 +49,7 @@ static const int THIRD_SLOT_ID = 3;
 
 class KuduTableSinkTest : public testing::Test {
  public:
-  KuduTableSinkTest()
-      : runtime_state_(TExecPlanFragmentParams(), "", &exec_env_) {}
+  KuduTableSinkTest() : runtime_state_(TExecPlanFragmentParams(), &exec_env_) 
{}
 
   virtual void SetUp() {
     // Create a Kudu client and the table (this will abort the test here

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 8a19603..8064de0 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -1025,7 +1025,7 @@ template <typename T> void TestSingleLiteralConstruction(
     const ColumnType& type, const T& value, const string& string_val) {
   ObjectPool pool;
   RowDescriptor desc;
-  RuntimeState state(TExecPlanFragmentParams(), "", NULL);
+  RuntimeState state(TExecPlanFragmentParams(), NULL);
   MemTracker tracker;
 
   Expr* expr = pool.Add(new Literal(type, value));
@@ -1041,7 +1041,7 @@ TEST_F(ExprTest, NullLiteral) {
   for (int type = TYPE_BOOLEAN; type != TYPE_DATE; ++type) {
     NullLiteral expr(static_cast<PrimitiveType>(type));
     ExprContext ctx(&expr);
-    RuntimeState state(TExecPlanFragmentParams(), "", NULL);
+    RuntimeState state(TExecPlanFragmentParams(), NULL);
     MemTracker tracker;
     EXPECT_OK(ctx.Prepare(&state, RowDescriptor(), &tracker));
     EXPECT_OK(ctx.Open(&state));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/resourcebroker/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/resourcebroker/CMakeLists.txt 
b/be/src/resourcebroker/CMakeLists.txt
deleted file mode 100644
index 776152c..0000000
--- a/be/src/resourcebroker/CMakeLists.txt
+++ /dev/null
@@ -1,31 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-# where to put generated libraries
-set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/resourcebroker")
-
-# where to put generated binaries
-set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/resourcebroker")
-
-add_library(ResourceBroker
-  resource-broker.cc
-)
-add_dependencies(ResourceBroker thrift-deps)
-
-# TODO: Add resource broker BE test
-# ADD_BE_TEST(resource-broker-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/resourcebroker/resource-broker.cc
----------------------------------------------------------------------
diff --git a/be/src/resourcebroker/resource-broker.cc 
b/be/src/resourcebroker/resource-broker.cc
deleted file mode 100644
index 4690c59..0000000
--- a/be/src/resourcebroker/resource-broker.cc
+++ /dev/null
@@ -1,850 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "resourcebroker/resource-broker.h"
-
-#include <boost/algorithm/string/join.hpp>
-#include <boost/uuid/uuid.hpp>
-#include <boost/uuid/uuid_generators.hpp>
-#include <boost/uuid/uuid_io.hpp>
-#include <boost/lexical_cast.hpp>
-#include <gutil/strings/substitute.h>
-#include <thrift/Thrift.h>
-
-#include "common/status.h"
-#include "rpc/thrift-util.h"
-#include "rpc/thrift-server.h"
-#include "scheduling/query-resource-mgr.h"
-#include "scheduling/scheduler.h"
-#include "util/debug-util.h"
-#include "util/uid-util.h"
-#include "util/network-util.h"
-#include "util/llama-util.h"
-#include "util/time.h"
-#include "gen-cpp/ResourceBrokerService.h"
-#include "gen-cpp/Llama_types.h"
-
-#include "common/names.h"
-
-using boost::algorithm::join;
-using boost::algorithm::to_lower;
-using boost::uuids::random_generator;
-using namespace ::apache::thrift::server;
-using namespace ::apache::thrift;
-using namespace impala;
-using namespace strings;
-
-DECLARE_int64(llama_registration_timeout_secs);
-DECLARE_int64(llama_registration_wait_secs);
-DECLARE_int64(llama_max_request_attempts);
-
-DECLARE_int32(resource_broker_cnxn_attempts);
-DECLARE_int32(resource_broker_cnxn_retry_interval_ms);
-DECLARE_int32(resource_broker_send_timeout);
-DECLARE_int32(resource_broker_recv_timeout);
-
-static const string LLAMA_KERBEROS_SERVICE_NAME = "llama";
-
-namespace impala {
-
-// String to search for in Llama error messages to detect that Llama has 
restarted,
-// and hence the resource broker must re-register.
-const string LLAMA_RESTART_SEARCH_STRING = "unknown handle";
-
-class LlamaNotificationThriftIf : public llama::LlamaNotificationServiceIf {
- public:
-  LlamaNotificationThriftIf(ResourceBroker* resource_broker)
-    : resource_broker_(resource_broker) {}
-
-  virtual void AMNotification(llama::TLlamaAMNotificationResponse& response,
-      const llama::TLlamaAMNotificationRequest& request) {
-    resource_broker_->AMNotification(request, response);
-  }
-
-  virtual void NMNotification(llama::TLlamaNMNotificationResponse& response,
-      const llama::TLlamaNMNotificationRequest& request) {
-    LOG(WARNING) << "Ignoring node-manager notification. Handling not yet 
implemented.";
-    response.status.__set_status_code(llama::TStatusCode::OK);
-  }
-
-  virtual ~LlamaNotificationThriftIf() {}
-
- private:
-  ResourceBroker* resource_broker_;
-};
-
-ResourceBroker::ResourceBroker(const vector<TNetworkAddress>& llama_addresses,
-    const TNetworkAddress& llama_callback_address, MetricGroup* metrics) :
-    llama_addresses_(llama_addresses),
-    active_llama_addr_idx_(-1),
-    llama_callback_address_(llama_callback_address),
-    metrics_(metrics),
-    scheduler_(NULL),
-    llama_callback_thrift_iface_(new LlamaNotificationThriftIf(this)),
-    llama_client_cache_(new ClientCache<llama::LlamaAMServiceClient>(
-        FLAGS_resource_broker_cnxn_attempts,
-        FLAGS_resource_broker_cnxn_retry_interval_ms,
-        FLAGS_resource_broker_send_timeout,
-        FLAGS_resource_broker_recv_timeout,
-        LLAMA_KERBEROS_SERVICE_NAME)) {
-  DCHECK(metrics != NULL);
-  llama_client_cache_->InitMetrics(metrics, "resource-broker");
-  active_llama_metric_ = metrics->AddProperty<string>(
-      "resource-broker.active-llama", "none");
-  active_llama_handle_metric_ = metrics->AddProperty<string>(
-      "resource-broker.active-llama-handle", "none");
-
-  reservation_rpc_time_metric_ = 
StatsMetric<double>::CreateAndRegister(metrics,
-      "resource-broker.reservation-request-rpc-time");
-  reservation_response_time_metric_ = 
StatsMetric<double>::CreateAndRegister(metrics,
-      "resource-broker.reservation-request-response-time");
-  reservation_requests_total_metric_ = metrics->AddCounter<int64_t>(
-      "resource-broker.reservation-requests-total", 0);
-  reservation_requests_fulfilled_metric_ = metrics->AddCounter<int64_t>(
-      "resource-broker.reservation-requests-fulfilled", 0);
-  reservation_requests_failed_metric_ = metrics->AddCounter<int64_t>(
-      "resource-broker.reservation-requests-failed", 0);
-  reservation_requests_rejected_metric_ = metrics->AddCounter<int64_t>(
-      "resource-broker.reservation-requests-rejected", 0);
-  reservation_requests_timedout_metric_ = metrics->AddCounter<int64_t>(
-      "resource-broker.reservation-requests-timedout", 0);
-
-  expansion_rpc_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
-      "resource-broker.expansion-request-rpc-time");
-  expansion_response_time_metric_ = 
StatsMetric<double>::CreateAndRegister(metrics,
-      "resource-broker.expansion-request-response-time");
-  expansion_requests_total_metric_ = metrics->AddCounter<int64_t>(
-      "resource-broker.expansion-requests-total", 0);
-  expansion_requests_fulfilled_metric_ = metrics->AddCounter<int64_t>(
-      "resource-broker.expansion-requests-fulfilled", 0);
-  expansion_requests_failed_metric_ = metrics->AddCounter<int64_t>(
-      "resource-broker.expansion-requests-failed", 0);
-  expansion_requests_rejected_metric_ = metrics->AddCounter<int64_t>(
-      "resource-broker.expansion-requests-rejected", 0);
-  expansion_requests_timedout_metric_ = metrics->AddCounter<int64_t>(
-      "resource-broker.expansion-requests-timedout", 0);
-
-  requests_released_metric_ = metrics->AddCounter<int64_t>(
-      "resource-broker.requests-released", 0);
-  allocated_memory_metric_ = metrics->AddGauge<uint64_t>(
-      "resource-broker.memory-resources-in-use", 0L);
-  allocated_vcpus_metric_ = metrics->AddGauge<uint64_t>(
-      "resource-broker.vcpu-resources-in-use", 0);
-}
-
-Status ResourceBroker::Init() {
-  // The scheduler must have been set before calling Init().
-  DCHECK(scheduler_ != NULL);
-  DCHECK(llama_callback_thrift_iface_ != NULL);
-  if (llama_addresses_.size() == 0) {
-    return Status("No Llama addresses configured (see --llama_addresses)");
-  }
-
-  boost::shared_ptr<TProcessor> llama_callback_proc(
-      new 
llama::LlamaNotificationServiceProcessor(llama_callback_thrift_iface_));
-  llama_callback_server_.reset(new ThriftServer("llama-callback", 
llama_callback_proc,
-      llama_callback_address_.port, NULL, metrics_, 5));
-  RETURN_IF_ERROR(llama_callback_server_->Start());
-
-  // Generate client id for registration with Llama, and register with LLama.
-  random_generator uuid_generator;
-  llama_client_id_ = uuid_generator();
-  RETURN_IF_ERROR(RegisterWithLlama());
-  RETURN_IF_ERROR(RefreshLlamaNodes());
-  return Status::OK();
-}
-
-Status ResourceBroker::RegisterWithLlama() {
-  // Remember the current llama_handle_ to detect if another thread has already
-  // completed the registration successfully.
-  llama::TUniqueId current_llama_handle = llama_handle_;
-
-  // Start time that this thread attempted registration. Used to limit the 
time that a
-  // query will wait for re-registration with the Llama to succeed.
-  int64_t start = MonotonicSeconds();
-  lock_guard<mutex> l(llama_registration_lock_);
-  if (llama_handle_ != current_llama_handle) return Status::OK();
-
-  active_llama_metric_->set_value("none");
-  active_llama_handle_metric_->set_value("none");
-
-  int llama_addr_idx = (active_llama_addr_idx_ + 1) % llama_addresses_.size();
-  int64_t now = MonotonicSeconds();
-  while (FLAGS_llama_registration_timeout_secs == -1 ||
-      (now - start) < FLAGS_llama_registration_timeout_secs) {
-    // Connect to the Llama at llama_address.
-    const TNetworkAddress& llama_address = llama_addresses_[llama_addr_idx];
-    // Client status will be ok if a Thrift connection could be successfully 
established
-    // for the returned client at some point in the past. Hence, there is no 
guarantee
-    // that the connection is still valid now and we must check for broken 
pipes, etc.
-    Status client_status;
-    ClientConnection<llama::LlamaAMServiceClient> 
llama_client(llama_client_cache_.get(),
-        llama_address, &client_status);
-    if (client_status.ok()) {
-      // Register this resource broker with Llama.
-      llama::TLlamaAMRegisterRequest request;
-      request.__set_version(llama::TLlamaServiceVersion::V1);
-      llama::TUniqueId llama_uuid;
-      UUIDToTUniqueId(llama_client_id_, &llama_uuid);
-      request.__set_client_id(llama_uuid);
-
-      llama::TNetworkAddress callback_address;
-      callback_address << llama_callback_address_;
-      request.__set_notification_callback_service(callback_address);
-      llama::TLlamaAMRegisterResponse response;
-      LOG(INFO) << "Registering Resource Broker with Llama at " << 
llama_address;
-      Status rpc_status =
-          llama_client.DoRpc(&llama::LlamaAMServiceClient::Register, request, 
&response);
-      if (rpc_status.ok()) {
-        // TODO: Is there a period where an inactive Llama may respond to RPCs?
-        // If so, then we need to keep cycling through Llamas here and not
-        // return an error.
-        RETURN_IF_ERROR(LlamaStatusToImpalaStatus(
-            response.status, "Failed to register Resource Broker with 
Llama."));
-        LOG(INFO) << "Received Llama client handle " << response.am_handle
-                  << ((response.am_handle == llama_handle_) ? " (same as old)" 
: "");
-        llama_handle_ = response.am_handle;
-        break;
-      }
-    }
-    // Cycle through the list of Llama addresses for Llama failover.
-    llama_addr_idx = (llama_addr_idx + 1) % llama_addresses_.size();
-    LOG(INFO) << "Failed to connect to Llama at " << llama_address << "." << 
endl
-              << "Error: " << client_status.GetDetail() << endl
-              << "Retrying to connect to Llama at "
-              << llama_addresses_[llama_addr_idx] << " in "
-              << FLAGS_llama_registration_wait_secs << "s.";
-    // Sleep to give Llama time to recover/failover before the next attempt.
-    SleepForMs(FLAGS_llama_registration_wait_secs * 1000);
-    now = MonotonicSeconds();
-  }
-  DCHECK(FLAGS_llama_registration_timeout_secs != -1);
-  if ((now - start) >= FLAGS_llama_registration_timeout_secs) {
-    return Status("Failed to (re-)register Resource Broker with Llama.");
-  }
-
-  if (llama_addr_idx != active_llama_addr_idx_) {
-    // TODO: We've switched to a different Llama (failover). Cancel all queries
-    // coordinated by this Impalad to free up physical resources that are not
-    // accounted for anymore by Yarn.
-  }
-
-  // If we reached this point, (re-)registration was successful.
-  active_llama_addr_idx_ = llama_addr_idx;
-  
active_llama_metric_->set_value(lexical_cast<string>(llama_addresses_[llama_addr_idx]));
-  active_llama_handle_metric_->set_value(lexical_cast<string>(llama_handle_));
-  return Status::OK();
-}
-
-bool ResourceBroker::LlamaHasRestarted(const llama::TStatus& status) const {
-  if (status.status_code == llama::TStatusCode::OK || 
!status.__isset.error_msgs) {
-    return false;
-  }
-  // Check whether one of the error messages contains 
LLAMA_RESTART_SEARCH_STRING.
-  for (int i = 0; i < status.error_msgs.size(); ++i) {
-    string error_msg = status.error_msgs[i];
-    to_lower(error_msg);
-    if (error_msg.find(LLAMA_RESTART_SEARCH_STRING) != string::npos) {
-      LOG(INFO) << "Assuming Llama restart from error message: " << 
status.error_msgs[i];
-      return true;
-    }
-  }
-  return false;
-}
-
-void ResourceBroker::Close() {
-  // Close connections to all Llama addresses, not just the active one.
-  for (const TNetworkAddress& llama_address: llama_addresses_) {
-    llama_client_cache_->CloseConnections(llama_address);
-  }
-  llama_callback_server_->Join();
-}
-
-void ResourceBroker::CreateLlamaReservationRequest(
-    const TResourceBrokerReservationRequest& src,
-    llama::TLlamaAMReservationRequest& dest) {
-  dest.version = llama::TLlamaServiceVersion::V1;
-  dest.am_handle = llama_handle_;
-  dest.gang = src.gang;
-  // Queue is optional, so must be explicitly set for all versions of Thrift 
to work
-  // together.
-  dest.__set_queue(src.queue);
-  dest.user = src.user;
-  dest.resources = src.resources;
-  random_generator uuid_generator;
-  llama::TUniqueId request_id;
-  UUIDToTUniqueId(uuid_generator(), &request_id);
-  dest.__set_reservation_id(request_id);
-}
-
-template <class F, typename LlamaReqType, typename LlamaRespType>
-Status ResourceBroker::LlamaRpc(const F& f, LlamaReqType* request,
-    LlamaRespType* response, StatsMetric<double>* rpc_time_metric) {
-  int attempts = 0;
-  MonotonicStopWatch sw;
-  // Indicates whether to re-register with Llama before the next RPC attempt,
-  // e.g. because Llama has restarted or become unavailable.
-  bool register_with_llama = false;
-  while (attempts < FLAGS_llama_max_request_attempts) {
-    if (register_with_llama) {
-      RETURN_IF_ERROR(ReRegisterWithLlama(*request, response));
-      // Set the new Llama handle received from re-registering.
-      request->__set_am_handle(llama_handle_);
-      VLOG_RPC << "Retrying Llama RPC after re-registration: " << *request;
-      register_with_llama = false;
-    }
-    ++attempts;
-    Status rpc_status;
-    ClientConnection<llama::LlamaAMServiceClient> 
llama_client(llama_client_cache_.get(),
-        llama_addresses_[active_llama_addr_idx_], &rpc_status);
-    if (!rpc_status.ok()) {
-      register_with_llama = true;
-      continue;
-    }
-
-    sw.Start();
-    Status status = llama_client.DoRpc(f, *request, response);
-    if (!status.ok()) {
-      VLOG_RPC << "Error making Llama RPC: " << status.GetDetail();
-      register_with_llama = status.code() == 
TErrorCode::RPC_CLIENT_CONNECT_FAILURE;
-      continue;
-    }
-    if (rpc_time_metric != NULL) {
-      rpc_time_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
-    }
-
-    // Check whether Llama has been restarted. If so, re-register with it.
-    // Break out of the loop here upon success of the RPC.
-    if (!LlamaHasRestarted(response->status)) break;
-    register_with_llama = true;
-  }
-  if (attempts >= FLAGS_llama_max_request_attempts) {
-    return Status(Substitute(
-        "Request aborted after $0 attempts due to connectivity issues with 
Llama.",
-        FLAGS_llama_max_request_attempts));
-  }
-  return Status::OK();
-}
-
-template <typename LlamaReqType, typename LlamaRespType>
-Status ResourceBroker::ReRegisterWithLlama(const LlamaReqType& request,
-    LlamaRespType* response) {
-  RETURN_IF_ERROR(RegisterWithLlama());
-  return RefreshLlamaNodes();
-}
-
-template <>
-Status ResourceBroker::ReRegisterWithLlama(const 
llama::TLlamaAMGetNodesRequest& request,
-    llama::TLlamaAMGetNodesResponse* response) {
-  return RegisterWithLlama();
-}
-
-void ResourceBroker::PendingRequest::GetResources(ResourceMap* resources) {
-  resources->clear();
-  for (const llama::TAllocatedResource& resource: allocated_resources_) {
-    TNetworkAddress host = MakeNetworkAddress(resource.location);
-    (*resources)[host] = resource;
-    VLOG_QUERY << "Getting allocated resource for reservation id "
-               << reservation_id_ << " and location " << host;
-  }
-}
-
-void ResourceBroker::PendingRequest::SetResources(
-    const vector<llama::TAllocatedResource>& resources) {
-  // TODO: Llama returns a dump of all resources that we need to manually 
group by
-  // reservation id. Can Llama do the grouping for us?
-  for (const llama::TAllocatedResource& resource: resources) {
-    // Ignore resources that don't belong to the given reservation id.
-    if (resource.reservation_id == request_id()) {
-      allocated_resources_.push_back(resource);
-    }
-  }
-}
-
-bool ResourceBroker::WaitForNotification(int64_t timeout, ResourceMap* 
resources,
-    bool* timed_out, PendingRequest* pending_request) {
-  bool request_granted;
-  if (timeout <= 0) {
-    *timed_out = false;
-    request_granted = pending_request->promise()->Get();
-  } else {
-    request_granted = pending_request->promise()->Get(timeout, timed_out);
-  }
-
-  // Remove the promise from the pending-requests map.
-  const llama::TUniqueId& res_id = pending_request->reservation_id();
-  {
-    lock_guard<mutex> l(pending_requests_lock_);
-    pending_requests_.erase(pending_request->request_id());
-    if (pending_request->is_expansion()) {
-      PendingExpansionIdsMap::iterator it = 
pending_expansion_ids_.find(res_id);
-      if (it == pending_expansion_ids_.end()) {
-        // If the AMNotification was received as the reservation was being 
cleaned up,
-        // it's possible that the pending/allocated request structures were 
updated
-        // before this thread was able to acquire the lock.
-        VLOG_RPC << "Didn't find reservation=" << res_id << " in pending 
requests";
-        return false;
-      }
-      it->second.erase(pending_request->request_id());
-    }
-  }
-
-  if (request_granted && !*timed_out) {
-    pending_request->GetResources(resources);
-    int64_t total_memory_mb = 0L;
-    int32_t total_vcpus = 0;
-    for (const ResourceMap::value_type& resource: *resources) {
-      total_memory_mb += resource.second.memory_mb;
-      total_vcpus += resource.second.v_cpu_cores;
-    }
-    {
-      lock_guard<mutex> l(allocated_requests_lock_);
-      AllocatedRequestMap::iterator it = allocated_requests_.find(res_id);
-      if (it == allocated_requests_.end()) {
-        // The reservation may have already been cleaned up. See above.
-        VLOG_RPC << "Didn't find reservation=" << res_id << " in allocated 
requests";
-        return false;
-      }
-      it->second.push_back(AllocatedRequest(res_id, total_memory_mb, 
total_vcpus,
-          pending_request->is_expansion()));
-      allocated_memory_metric_->Increment(total_memory_mb * 1024L * 1024L);
-      allocated_vcpus_metric_->Increment(total_vcpus);
-    }
-  }
-
-  return request_granted;
-}
-
-Status ResourceBroker::Expand(const TUniqueId& reservation_id,
-    const llama::TResource& resource, int64_t timeout_ms, llama::TUniqueId* 
expansion_id,
-    llama::TAllocatedResource* allocated_resource) {
-  llama::TLlamaAMReservationExpansionRequest ll_request;
-  llama::TLlamaAMReservationExpansionResponse ll_response;
-
-  ll_request.version = llama::TLlamaServiceVersion::V1;
-  ll_request.am_handle = llama_handle_;
-  ll_request.expansion_of << reservation_id;
-  random_generator uuid_generator;
-  llama::TUniqueId request_id;
-  UUIDToTUniqueId(uuid_generator(), &request_id);
-  ll_request.__set_expansion_id(request_id);
-  ll_request.resource = resource;
-  VLOG_RPC << "Sending expansion request for reservation_id=" << reservation_id
-           << " expansion_id=" << request_id
-           << " resource=" << resource;
-
-  PendingRequest* pending_request;
-  {
-    lock_guard<mutex> l(pending_requests_lock_);
-    PendingExpansionIdsMap::iterator it =
-        pending_expansion_ids_.find(ll_request.expansion_of);
-    // If pending_expansion_ids_ doesn't contain the reservation id then the
-    // QueryResourceMgr has already been unregistered and the reservation has 
been
-    // released.
-    if (it == pending_expansion_ids_.end()) {
-      return Status(Substitute("Resource expansion request (expansion id=$0, "
-          "reservation id=$1) made after reservation released.",
-          PrintId(ll_request.expansion_id), PrintId(reservation_id)));
-    }
-    it->second.insert(request_id);
-    pending_request = new PendingRequest(ll_request.expansion_of, request_id, 
true);
-    pending_requests_.insert(make_pair(request_id, pending_request));
-  }
-
-  MonotonicStopWatch sw;
-  sw.Start();
-  Status status = LlamaRpc(&llama::LlamaAMServiceClient::Expand, &ll_request,
-      &ll_response, expansion_rpc_time_metric_);
-  // Check the status of the response.
-  if (!status.ok()) {
-    expansion_requests_failed_metric_->Increment(1);
-    return status;
-  }
-
-  Status request_status = LlamaStatusToImpalaStatus(ll_response.status);
-  if (!request_status.ok()) {
-    expansion_requests_failed_metric_->Increment(1);
-    return request_status;
-  }
-
-  ResourceMap allocated_resources;
-  bool timed_out = false;
-  bool request_granted = WaitForNotification(timeout_ms,
-      &allocated_resources, &timed_out, pending_request);
-
-  if (timed_out) {
-    expansion_requests_timedout_metric_->Increment(1);
-    Status release_status = ReleaseRequest(request_id);
-    if (!release_status.ok()) {
-      VLOG_QUERY << "Error releasing timed out expansion request, 
expansion_id="
-                 << request_id << " status: " << release_status.GetDetail();
-    }
-    return Status(Substitute("Resource expansion request (expansion id=$0, "
-        "reservation id=$1) exceeded timeout of $2.",
-        PrintId(ll_request.expansion_id),
-        PrintId(reservation_id),
-        PrettyPrinter::Print(timeout_ms * 1000L * 1000L, TUnit::TIME_NS)));
-  }
-  expansion_response_time_metric_->Update(
-      sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
-
-  if (!request_granted) {
-    if (pending_request->is_cancelled()) {
-      return Status(Substitute("Resource expansion request (expansion id=$0, "
-          "reservation id=$1) was cancelled.", 
PrintId(ll_request.expansion_id),
-          PrintId(reservation_id)));
-    }
-    expansion_requests_rejected_metric_->Increment(1);
-    return Status(Substitute("Resource expansion request (expansion id=$0, "
-        "reservation id=$1) was rejected.", PrintId(ll_request.expansion_id),
-        PrintId(reservation_id)));
-  }
-
-  DCHECK_EQ(allocated_resources.size(), 1);
-  *allocated_resource = allocated_resources.begin()->second;
-  *expansion_id = request_id;
-
-  VLOG_QUERY << "Fulfilled expansion for id=" << ll_response.expansion_id
-             << " resource=" << *allocated_resource;
-  expansion_requests_fulfilled_metric_->Increment(1);
-  return Status::OK();
-}
-
-Status ResourceBroker::Reserve(const TResourceBrokerReservationRequest& 
request,
-    TResourceBrokerReservationResponse* response) {
-  VLOG_QUERY << "Sending reservation request: " << request;
-  reservation_requests_total_metric_->Increment(1);
-
-  llama::TLlamaAMReservationRequest ll_request;
-  llama::TLlamaAMReservationResponse ll_response;
-  CreateLlamaReservationRequest(request, ll_request);
-  const llama::TUniqueId& res_id = ll_request.reservation_id;
-
-  PendingRequest* pending_request;
-  {
-    pending_request = new PendingRequest(res_id, res_id, false);
-    lock_guard<mutex> l(pending_requests_lock_);
-    pending_requests_.insert(make_pair(pending_request->request_id(), 
pending_request));
-  }
-  {
-    lock_guard<mutex> l(allocated_requests_lock_);
-    DCHECK(allocated_requests_.find(res_id) == allocated_requests_.end());
-    allocated_requests_[res_id] = vector<AllocatedRequest>();
-  }
-
-  MonotonicStopWatch sw;
-  sw.Start();
-  Status status = LlamaRpc(&llama::LlamaAMServiceClient::Reserve, &ll_request,
-      &ll_response, reservation_rpc_time_metric_);
-  // Check the status of the response.
-  if (!status.ok()) {
-    reservation_requests_failed_metric_->Increment(1);
-    return status;
-  }
-  Status request_status = LlamaStatusToImpalaStatus(ll_response.status);
-  if (!request_status.ok()) {
-    reservation_requests_failed_metric_->Increment(1);
-    return request_status;
-  }
-  VLOG_RPC << "Received reservation response, waiting for notification on: " 
<< res_id;
-
-  bool timed_out = false;
-  bool request_granted = WaitForNotification(request.request_timeout,
-      &response->allocated_resources, &timed_out, pending_request);
-
-  if (request_granted || timed_out) {
-    // Set the reservation_id to make sure it eventually gets released - even 
if when
-    // timed out, since the response may arrive later.
-    response->__set_reservation_id(CastTUniqueId<llama::TUniqueId, 
TUniqueId>(res_id));
-  }
-
-  if (timed_out) {
-    reservation_requests_timedout_metric_->Increment(1);
-    return Status(Substitute(
-        "Resource reservation request (id=$0) exceeded timeout of $1.",
-        PrintId(res_id),
-        PrettyPrinter::Print(request.request_timeout * 1000L * 1000L,
-        TUnit::TIME_NS)));
-  }
-  reservation_response_time_metric_->Update(
-      sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
-
-  if (!request_granted) {
-    reservation_requests_rejected_metric_->Increment(1);
-    return Status(Substitute("Resource reservation request (id=$0) was 
rejected.",
-        PrintId(res_id)));
-  }
-
-  response->__set_reservation_id(CastTUniqueId<llama::TUniqueId, 
TUniqueId>(res_id));
-  VLOG_QUERY << "Fulfilled reservation with id: " << res_id;
-  reservation_requests_fulfilled_metric_->Increment(1);
-  return Status::OK();
-}
-
-void ResourceBroker::ClearRequests(const TUniqueId& reservation_id) {
-  int64_t total_memory_bytes = 0L;
-  int32_t total_vcpus = 0L;
-  llama::TUniqueId llama_id = CastTUniqueId<TUniqueId, 
llama::TUniqueId>(reservation_id);
-  {
-    lock_guard<mutex> l(pending_requests_lock_);
-    PendingExpansionIdsMap::iterator it = 
pending_expansion_ids_.find(llama_id);
-    if (it != pending_expansion_ids_.end()) {
-      for (const llama::TUniqueId& id: it->second) {
-        PendingRequestMap::iterator request_it = pending_requests_.find(id);
-        DCHECK(request_it != pending_requests_.end());
-        if (request_it == pending_requests_.end()) continue;
-        // It is possible that the AMNotification thread set the promise and 
the thread
-        // waiting on the promise hasn't had a chance to acquire the
-        // pending_requests_lock_ yet to remove it from pending_requests_. We 
don't need
-        // to do anything because it will be released with the reservation 
anyway.
-        if (request_it->second->promise()->IsSet()) continue;
-        request_it->second->SetCancelled();
-        request_it->second->promise()->Set(false);
-      }
-      it->second.clear();
-      pending_expansion_ids_.erase(it);
-    }
-  }
-  {
-    lock_guard<mutex> l(allocated_requests_lock_);
-    AllocatedRequestMap::iterator it = allocated_requests_.find(llama_id);
-    if (it == allocated_requests_.end()) return;
-    for (AllocatedRequest& allocated_req: it->second) {
-      DCHECK(allocated_req.reservation_id() == llama_id);
-      total_memory_bytes += (allocated_req.memory_mb() * 1024L * 1024L);
-      total_vcpus += allocated_req.vcpus();
-    }
-    it->second.clear();
-    allocated_requests_.erase(it);
-    allocated_memory_metric_->Increment(-total_memory_bytes);
-    allocated_vcpus_metric_->Increment(-total_vcpus);
-  }
-
-  VLOG_QUERY << "Releasing "
-             << PrettyPrinter::Print(total_memory_bytes, TUnit::BYTES)
-             << " and " << total_vcpus << " cores for " << llama_id;
-}
-
-Status ResourceBroker::ReleaseRequest(const llama::TUniqueId& request_id) {
-  llama::TLlamaAMReleaseRequest llama_request;
-  llama::TLlamaAMReleaseResponse llama_response;
-  llama_request.version = llama::TLlamaServiceVersion::V1;
-  llama_request.am_handle = llama_handle_;
-  llama_request.reservation_id = request_id;
-
-  RETURN_IF_ERROR(LlamaRpc(&llama::LlamaAMServiceClient::Release,
-          &llama_request, &llama_response,reservation_rpc_time_metric_));
-  RETURN_IF_ERROR(LlamaStatusToImpalaStatus(llama_response.status));
-  return Status::OK();
-}
-
-Status ResourceBroker::ReleaseReservation(const impala::TUniqueId& 
reservation_id) {
-  VLOG_QUERY << "Releasing all resources for reservation: " << reservation_id;
-  llama::TUniqueId llama_id = CastTUniqueId<TUniqueId, 
llama::TUniqueId>(reservation_id);
-
-  ClearRequests(reservation_id);
-  RETURN_IF_ERROR(ReleaseRequest(llama_id));
-  requests_released_metric_->Increment(1);
-  return Status::OK();
-}
-
-void ResourceBroker::AMNotification(const llama::TLlamaAMNotificationRequest& 
request,
-    llama::TLlamaAMNotificationResponse& response) {
-  {
-    // This Impalad may have restarted, so it is possible Llama is sending 
notifications
-    // while this Impalad is registering with Llama.
-    lock_guard<mutex> l(llama_registration_lock_);
-    if (request.am_handle != llama_handle_) {
-      VLOG_QUERY << "Ignoring Llama AM notification with mismatched AM handle. 
"
-                 << "Known handle: " << llama_handle_ << ". Received handle: "
-                 << request.am_handle;
-      // Ignore all notifications with mismatched handles.
-      return;
-    }
-  }
-  // Nothing to be done for heartbeats.
-  if (request.heartbeat) return;
-  VLOG_QUERY << "Received non-heartbeat AM notification";
-
-  lock_guard<mutex> l(pending_requests_lock_);
-
-  // Process granted allocations.
-  for (const llama::TUniqueId& res_id: request.allocated_reservation_ids) {
-    PendingRequestMap::iterator it = pending_requests_.find(res_id);
-    if (it == pending_requests_.end()) {
-      VLOG_RPC << "Allocation for " << res_id << " arrived after timeout or 
cleanup";
-      continue;
-    }
-    if (it->second->promise()->IsSet()) {
-      // The promise should not have been set unless it was already cancelled.
-      DCHECK(it->second->is_cancelled());
-      continue;
-    }
-    LOG(INFO) << "Received allocated resource for reservation id: " << res_id;
-    it->second->SetResources(request.allocated_resources);
-    it->second->promise()->Set(true);
-  }
-
-  // Process rejected allocations.
-  for (const llama::TUniqueId& res_id: request.rejected_reservation_ids) {
-    PendingRequestMap::iterator it = pending_requests_.find(res_id);
-    if (it == pending_requests_.end()) {
-      VLOG_RPC << "Rejection for " << res_id << " arrived after timeout";
-      continue;
-    }
-    if (it->second->promise()->IsSet()) {
-      DCHECK(it->second->is_cancelled());
-      continue;
-    }
-    it->second->promise()->Set(false);
-  }
-
-  // TODO: We maybe want a thread pool for handling preemptions to avoid
-  // blocking this function on query cancellations.
-  // Process preempted reservations.
-  for (const llama::TUniqueId& res_id: request.preempted_reservation_ids) {
-    TUniqueId impala_res_id;
-    impala_res_id << res_id;
-    scheduler_->HandlePreemptedReservation(impala_res_id);
-  }
-
-  // Process preempted client resources.
-  for (const llama::TUniqueId& res_id: request.preempted_client_resource_ids) {
-    TUniqueId impala_res_id;
-    impala_res_id << res_id;
-    scheduler_->HandlePreemptedResource(impala_res_id);
-  }
-
-  // Process lost client resources.
-  for (const llama::TUniqueId& res_id: request.lost_client_resource_ids) {
-    TUniqueId impala_res_id;
-    impala_res_id << res_id;
-    scheduler_->HandlePreemptedResource(impala_res_id);
-  }
-
-  response.status.__set_status_code(llama::TStatusCode::OK);
-}
-
-void ResourceBroker::NMNotification(const llama::TLlamaNMNotificationRequest& 
request,
-    llama::TLlamaNMNotificationResponse& response) {
-}
-
-Status ResourceBroker::RefreshLlamaNodes() {
-  llama::TLlamaAMGetNodesRequest llama_request;
-  llama_request.__set_am_handle(llama_handle_);
-  llama_request.__set_version(llama::TLlamaServiceVersion::V1);
-  llama::TLlamaAMGetNodesResponse llama_response;
-
-  RETURN_IF_ERROR(LlamaRpc(&llama::LlamaAMServiceClient::GetNodes, 
&llama_request,
-      &llama_response, NULL));
-  RETURN_IF_ERROR(LlamaStatusToImpalaStatus(llama_response.status));
-  llama_nodes_ = llama_response.nodes;
-  LOG(INFO) << "Llama Nodes [" << join(llama_nodes_, ", ") << "]";
-  return Status::OK();
-}
-
-bool ResourceBroker::GetQueryResourceMgr(const TUniqueId& query_id,
-    const TUniqueId& reservation_id, const TNetworkAddress& 
local_resource_address,
-    QueryResourceMgr** mgr) {
-  lock_guard<mutex> l(query_resource_mgrs_lock_);
-  pair<int32_t, QueryResourceMgr*>* entry = &query_resource_mgrs_[query_id];
-  if (entry->second == NULL) {
-    entry->second =
-        new QueryResourceMgr(reservation_id, local_resource_address, query_id);
-    DCHECK_EQ(entry->first, 0);
-    // Also create the per-query entries in the allocated_resources_ and
-    // pending_expansion_ids_ map.
-    llama::TUniqueId llama_id = CastTUniqueId<TUniqueId, 
llama::TUniqueId>(reservation_id);
-    {
-      lock_guard<mutex> pending_lock(pending_requests_lock_);
-      DCHECK(pending_expansion_ids_.find(llama_id) == 
pending_expansion_ids_.end());
-      pending_expansion_ids_[llama_id] = 
boost::unordered_set<llama::TUniqueId>();
-    }
-    {
-      lock_guard<mutex> allocated_lock(allocated_requests_lock_);
-      if (allocated_requests_.find(llama_id) == allocated_requests_.end()) {
-        allocated_requests_[llama_id] = vector<AllocatedRequest>();
-      }
-    }
-  }
-  *mgr = entry->second;
-  // Return true if this is the first reference to this resource mgr.
-  return ++entry->first == 1L;
-}
-
-void ResourceBroker::UnregisterQueryResourceMgr(const TUniqueId& query_id) {
-  lock_guard<mutex> l(query_resource_mgrs_lock_);
-  QueryResourceMgrsMap::iterator it = query_resource_mgrs_.find(query_id);
-  DCHECK(it != query_resource_mgrs_.end())
-      << "UnregisterQueryResourceMgr() without corresponding 
GetQueryResourceMgr()";
-  if (--it->second.first == 0) {
-    it->second.second->Shutdown();
-    ClearRequests(it->second.second->reservation_id());
-    delete it->second.second;
-    query_resource_mgrs_.erase(it);
-  }
-}
-
-ostream& operator<<(ostream& os,
-    const map<TNetworkAddress, llama::TAllocatedResource>& resources) {
-  typedef map<TNetworkAddress, llama::TAllocatedResource> ResourceMap;
-  int count = 0;
-  for (const ResourceMap::value_type& resource: resources) {
-    os << "(" << resource.first << "," << resource.second << ")";
-    if (++count != resources.size()) os << ",";
-  }
-  return os;
-}
-
-ostream& operator<<(ostream& os, const TResourceBrokerReservationRequest& 
request) {
-  os << "Reservation Request("
-     << "queue=" << request.queue << " "
-     << "user=" << request.user << " "
-     << "gang=" << request.gang << " "
-     << "request_timeout=" << request.request_timeout << " "
-     << "resources=[";
-  for (int i = 0; i < request.resources.size(); ++i) {
-    os << request.resources[i];
-    if (i + 1 != request.resources.size()) os << ",";
-  }
-  os << "])";
-  return os;
-}
-
-ostream& operator<<(ostream& os, const TResourceBrokerReservationResponse& 
reservation) {
-  os << "Granted Reservation("
-     << "reservation id=" << reservation.reservation_id << " "
-     << "resources=[" << reservation.allocated_resources << "])";
-  return os;
-}
-
-ostream& operator<<(ostream& os, const TResourceBrokerExpansionRequest& 
request) {
-  os << "Expansion Request("
-     << "reservation id=" << request.reservation_id << " "
-     << "resource=" << request.resource << " "
-     << "request_timeout=" << request.request_timeout << ")";
-  return os;
-}
-
-ostream& operator<<(ostream& os, const TResourceBrokerExpansionResponse& 
expansion) {
-  os << "Expansion Response("
-     << "reservation id=" << expansion.reservation_id << " "
-     << "resources=[" << expansion.allocated_resources << "])";
-  return os;
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/resourcebroker/resource-broker.h
----------------------------------------------------------------------
diff --git a/be/src/resourcebroker/resource-broker.h 
b/be/src/resourcebroker/resource-broker.h
deleted file mode 100644
index b9e0bd7..0000000
--- a/be/src/resourcebroker/resource-broker.h
+++ /dev/null
@@ -1,424 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef RESOURCE_BROKER_H_
-#define RESOURCE_BROKER_H_
-
-#include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/uuid/uuid.hpp>
-
-#include "runtime/client-cache.h"
-#include "util/collection-metrics.h"
-#include "util/promise.h"
-#include "util/stopwatch.h"
-#include "gen-cpp/LlamaAMService.h"
-#include "gen-cpp/LlamaNotificationService.h"
-#include "gen-cpp/ResourceBrokerService_types.h"
-
-namespace impala {
-
-class QueryResourceMgr;
-class Status;
-class MetricGroup;
-class Scheduler;
-class ResourceBrokerNotificationServiceClient;
-class RuntimeProfile;
-
-/// Mediates resource-reservation requests between Impala and Yarn via the 
Llama service.
-/// The resource broker requests resources via the Llama's thrift interface 
and exposes
-/// a thrift server for the Llama to notify it of granted/denied/preempted 
resource
-/// reservations. The reserve/release API of the resource broker is blocking.
-/// The resource broker is configured with a list of Llama addresses that
-/// are cycled through for failover.
-/// TODO: Implement NM notification service.
-class ResourceBroker {
- public:
-  ResourceBroker(const std::vector<TNetworkAddress>& llama_addresses,
-      const TNetworkAddress& llama_callback_address, MetricGroup* metrics);
-
-  /// Register this resource broker with LLama and starts the Llama callback 
service.
-  /// Returns a non-OK status if the callback service failed to start (e.g., 
port in use)
-  /// or if registration with the Llama failed (e.g., connection to Llama 
failed).
-  Status Init();
-
-  /// Closes the llama_client_cache_ and joins the llama_callback_server_.
-  void Close();
-
-  /// Requests resources from Llama. Blocks until the request has been granted 
or denied.
-  /// TODO: Remove thrift interface
-  Status Reserve(const TResourceBrokerReservationRequest& request,
-      TResourceBrokerReservationResponse* response);
-
-  /// Requests more resources specified by 'resource' from Llama for an 
existing
-  /// reservation specified by the 'reservation_id'. Blocks until the request 
has been
-  /// granted or rejected, or no response is received within the timeout 
specified, in
-  /// which case a call to cancel the outstanding expansion is made and the 
call returns
-  /// with an error status. If timeout_ms <= 0, the call will not timeout. If 
the
-  /// expansion is successful, an OK status is returned and the 'expansion_id' 
and
-  /// 'allocated_resource' are set. An error status is returned if a timeout 
or an error
-  /// occurs.
-  Status Expand(const TUniqueId& reservation_id, const llama::TResource& 
resource,
-      int64_t timeout_ms, llama::TUniqueId* expansion_id,
-      llama::TAllocatedResource* allocated_resource);
-
-  /// Removes the record of all resource requests associated with this
-  /// 'reservation_id', updating the per-node accounting of resources and 
cancels any
-  /// threads waiting on pending expansions. Does not communicate this to 
Llama, so the
-  /// coordinator should always call ReleaseReservation() to make sure that 
Llama knows
-  /// the resources should be released.
-  void ClearRequests(const TUniqueId& reservation_id);
-
-  /// Releases resources acquired from Llama for this reservation and all 
associated
-  /// expansion requests across _all_ nodes. Should therefore only be called 
once per
-  /// query by the coordinator.
-  Status ReleaseReservation(const TUniqueId& reservation_id);
-
-  /// Handles asynchronous Llama Application Master (AM) notifications 
including
-  /// granted/denied/preempted reservations and resources.
-  void AMNotification(const llama::TLlamaAMNotificationRequest& request,
-      llama::TLlamaAMNotificationResponse& response);
-
-  /// Handles asynchronous notifications from the Llama Node Manager (NM)
-  /// auxiliary service, in particular, incoming Yarn container allocations
-  /// that are going to claim resources.
-  /// TODO: Implement once NM service is fully functional.
-  void NMNotification(const llama::TLlamaNMNotificationRequest& request,
-      llama::TLlamaNMNotificationResponse& response);
-
-  const std::vector<std::string>& llama_nodes() { return llama_nodes_; }
-
-  /// Retrieves the nodes known to Llama and stores them in llama_nodes_.
-  Status RefreshLlamaNodes();
-
-  void set_scheduler(Scheduler* scheduler) { scheduler_ = scheduler; };
-
-  /// Retrieves or creates a new QueryResourceMgr for the given query ID. 
Returns true if
-  /// this is the first 'checkout' of this QueryResourceMgr, false otherwise. 
The other
-  /// parameters are passed to the QueryResourceMgr constructor.
-  bool GetQueryResourceMgr(const TUniqueId& query_id, const TUniqueId& 
reservation_id,
-      const TNetworkAddress& local_resource_address, QueryResourceMgr** 
res_mgr);
-
-  /// Decrements the reference count for a particular QueryResourceMgr. If 
this is the last
-  /// reference (i.e. the ref count goes to 0), the QueryResourceMgr is 
deleted. It's an
-  /// error to call this with a query_id that does not have a registered 
QueryResourceMgr.
-  void UnregisterQueryResourceMgr(const TUniqueId& query_id);
-
- private:
-  typedef std::map<TNetworkAddress, llama::TAllocatedResource> ResourceMap;
-
-  bool has_standby_llama() { return llama_addresses_.size() > 1; }
-
-  /// Registers this resource broker with the Llama. Cycles through the list of
-  /// Llama addresses to find the active Llama which is accepting requests (if 
any).
-  /// Returns a non-OK status if registration with any of the Llama's did not 
succeed
-  /// within FLAGS_llama_registration_timeout_s seconds.
-  /// Registration with the Llama is idempotent with respect to the 
llama_client_id_
-  /// (see comment on llama_client_id_ for details).
-  Status RegisterWithLlama();
-
-  /// Issues the Llama RPC f where F is a thrift call taking LlamaReqType and 
returning
-  /// LlamaRespType. If failures occur, this function handles re-registering 
with Llama
-  /// if necessary and re-trying multiple times. If rpc_time_metric is 
non-NULL, the
-  /// metric is updated upon success of the RPC. Returns a non-OK status if 
the RPC
-  /// failed due to connectivity issues with the Llama. Returns OK if the RPC 
succeeded.
-  template <class F, typename LlamaReqType, typename LlamaRespType>
-  Status LlamaRpc(const F& f, LlamaReqType* request, LlamaRespType* response,
-      StatsMetric<double>* rpc_time_metric);
-
-  /// Re-registers with Llama to recover from the Llama being unreachable. 
Handles both
-  /// Llama restart and failover. This function is a template to allow 
specialization on
-  /// the Llama request/response type.
-  template <typename LlamaReqType, typename LlamaRespType>
-  Status ReRegisterWithLlama(const LlamaReqType& request, LlamaRespType* 
response);
-
-  /// Detects Llama restarts from the given return status of a Llama RPC.
-  bool LlamaHasRestarted(const llama::TStatus& status) const;
-
-  /// Sends a Llama release RPC for the reservation or expansion with the 
specified
-  /// request_id.
-  Status ReleaseRequest(const llama::TUniqueId& request_id);
-
-  /// Creates a Llama reservation request from a resource broker reservation 
request.
-  void CreateLlamaReservationRequest(const TResourceBrokerReservationRequest& 
src,
-      llama::TLlamaAMReservationRequest& dest);
-
-  class PendingRequest;
-  /// Wait for a reservation or expansion request to be fulfilled by the Llama 
via an
-  /// async call into LlamaNotificationThriftIf::AMNotification(). If 
timeout_ms > 0, the
-  /// call will not wait longer than timeout_ms before returning false and 
*timed_out set
-  /// to true. If the request is fulfilled, resources and reservation_id are 
populated.
-  bool WaitForNotification(int64_t timeout_ms, ResourceMap* resources, bool* 
timed_out,
-      PendingRequest* reservation);
-
-  /// Llama availability group.
-  std::vector<TNetworkAddress> llama_addresses_;
-
-  /// Indexes into llama_addresses_ indicating the currently active Llama.
-  /// Protected by llama_registration_lock_.
-  int active_llama_addr_idx_;
-
-  /// Address of thrift server started in this resource broker to handle
-  /// Llama notifications.
-  TNetworkAddress llama_callback_address_;
-
-  MetricGroup* metrics_;
-
-  Scheduler* scheduler_;
-
-  /// Address of the active Llama. A Llama is considered active once we have 
successfully
-  /// registered with it. Set to "none" while registering with the Llama.
-  StringProperty* active_llama_metric_;
-
-  /// Llama handle received from the active Llama upon registration.
-  /// Set to "none" while not registered with Llama.
-  StringProperty* active_llama_handle_metric_;
-
-  /// Accumulated statistics on the time taken to RPC a reservation request 
and receive
-  /// an acknowledgement from Llama.
-  StatsMetric<double>* reservation_rpc_time_metric_;
-
-  /// Accumulated statistics on the time taken to complete a reservation 
request
-  /// (granted or denied). The time includes the request RPC to Llama and the 
time
-  /// the requesting thread waits on the pending_requests_'s promise.
-  /// The metric does not include requests that timed out.
-  StatsMetric<double>* reservation_response_time_metric_;
-
-  /// Total number of reservation requests.
-  IntCounter* reservation_requests_total_metric_;
-
-  /// Number of fulfilled reservation requests.
-  IntCounter* reservation_requests_fulfilled_metric_;
-
-  /// Reservation requests that failed due to a malformed request or an 
internal
-  /// error in Llama.
-  IntCounter* reservation_requests_failed_metric_;
-
-  /// Number of well-formed reservation requests rejected by the central 
scheduler.
-  IntCounter* reservation_requests_rejected_metric_;
-
-  /// Number of well-formed reservation requests that did not get fulfilled 
within
-  /// the timeout period.
-  IntCounter* reservation_requests_timedout_metric_;
-
-  /// Accumulated statistics on the time taken to RPC an expansion request and 
receive an
-  /// acknowledgement from Llama.
-  StatsMetric<double>* expansion_rpc_time_metric_;
-
-  /// Accumulated statistics on the time taken to complete an expansion request
-  /// (granted or denied). The time includes the request RPC to Llama and the 
time
-  /// the requesting thread waits on the pending_requests_'s promise.
-  /// The metric does not include requests that timed out.
-  StatsMetric<double>* expansion_response_time_metric_;
-
-  /// Total number of expansion requests.
-  IntCounter* expansion_requests_total_metric_;
-
-  /// Number of fulfilled expansion requests.
-  IntCounter* expansion_requests_fulfilled_metric_;
-
-  /// Expansion requests that failed due to a malformed request or an internal
-  /// error in Llama.
-  IntCounter* expansion_requests_failed_metric_;
-
-  /// Number of well-formed expansion requests rejected by the central 
scheduler.
-  IntCounter* expansion_requests_rejected_metric_;
-
-  /// Number of well-formed expansion requests that did not get fulfilled 
within
-  /// the timeout period.
-  IntCounter* expansion_requests_timedout_metric_;
-
-  /// Total amount of memory currently allocated by Llama to this node
-  UIntGauge* allocated_memory_metric_;
-
-  /// Total number of vcpu cores currently allocated by Llama to this node
-  UIntGauge* allocated_vcpus_metric_;
-
-  /// Total number of fulfilled reservation requests that have been released.
-  IntCounter* requests_released_metric_;
-
-  /// Client id used to register with Llama. Set in Init(). Used to 
communicate to Llama
-  /// whether this Impalad has restarted. Registration with Llama is 
idempotent if the
-  /// same llama_client_id_ is passed, i.e., the same Llama handle is returned 
and
-  /// resource allocations are preserved. From Llama's perspective an unknown
-  /// llama_client_id_ indicates a new registration and all resources 
allocated by this
-  /// Impalad under a different llama_client_id_ are consider lost and will be 
released.
-  boost::uuids::uuid llama_client_id_;
-
-  /// Thrift API implementation which proxies Llama notifications onto this 
ResourceBroker.
-  boost::shared_ptr<llama::LlamaNotificationServiceIf> 
llama_callback_thrift_iface_;
-  boost::scoped_ptr<ThriftServer> llama_callback_server_;
-
-  /// Cache of Llama client connections.
-  boost::scoped_ptr<ClientCache<llama::LlamaAMServiceClient>> 
llama_client_cache_;
-
-  /// Lock to ensure that only a single registration with Llama is sent, e.g.,
-  /// when multiple concurrent requests realize that Llama has restarted.
-  boost::mutex llama_registration_lock_;
-
-  /// Handle received from Llama during registration. Set in 
RegisterWithLlama().
-  llama::TUniqueId llama_handle_;
-
-  /// List of nodes registered with Llama. Set in RefreshLlamaNodes().
-  std::vector<std::string> llama_nodes_;
-
-  /// A PendingRequest tracks a single reservation or expansion request that 
is in flight
-  /// to Llama. A new PendingRequest is created in either Expand() or 
Reserve(), and its
-  /// promise() is blocked on there until a response is received for that 
request from
-  /// Llama via AMNotification(), or until a timeout occurs.
-  //
-  /// Every request has a unique request_id which is assigned by the resource 
broker. Each
-  /// request is also associated with exactly one reservation, via 
reservation_id(). This
-  /// allows us to track which resources belong to which reservation, and to 
make sure that
-  /// all are correctly accounted for when the reservation is released. Each 
reservation ID
-  /// will belong to exactly one reservation request, and 0 or more expansion 
requests.
-  class PendingRequest {
-   public:
-    PendingRequest(const llama::TUniqueId& reservation_id,
-        const llama::TUniqueId& request_id, bool is_expansion)
-        : reservation_id_(reservation_id), request_id_(request_id),
-          is_expansion_(is_expansion), is_cancelled_(false) {
-      DCHECK(is_expansion || reservation_id == request_id);
-    }
-
-    /// Promise is set to true if the reservation or expansion request was 
granted, false
-    /// if it was rejected by Yarn. When promise()->Get() returns true,
-    /// allocated_resources_ will be populated and it will be safe to call 
GetResources().
-    Promise<bool>* promise() { return &promise_; }
-
-    /// Called by WaitForNotification() to populate a map of resources once the
-    /// corresponding request has returned successfully (and promise() 
therefore has
-    /// returned true).
-    void GetResources(ResourceMap* resources);
-
-    /// Populates allocated_resources_ from all members of resources that 
match the given
-    /// reservation id. Called in AMNotification().
-    void SetResources(const std::vector<llama::TAllocatedResource>& resources);
-
-    const llama::TUniqueId& request_id() const { return request_id_; }
-    const llama::TUniqueId& reservation_id() const { return reservation_id_; }
-
-    bool is_expansion() const { return is_expansion_; }
-    bool is_cancelled() const { return is_cancelled_; }
-
-    /// Sets the cancelled flag to true. Is only called before the promise is 
set and
-    /// while the pending_requests_lock_ is held to avoid races.
-    void SetCancelled() { is_cancelled_ = true; }
-
-   private:
-    /// Promise object that WaitForNotification() waits on and 
AMNotification() signals.
-    Promise<bool> promise_;
-
-    /// Filled in by AMNotification(), so that WaitForNotification() can read 
the set of
-    /// allocated_resources without AMNotification() having to wait (hence the 
copy is
-    /// deliberate, since the original copy may go out of scope).
-    std::vector<llama::TAllocatedResource> allocated_resources_;
-
-    /// The ID for the reservation associated with this request. There is 
always exactly
-    /// one reservation associated with every request.
-    llama::TUniqueId reservation_id_;
-
-    /// The unique ID for this request. If this is a reservation request, 
request_id_ ==
-    /// reservation_id_, otherwise this is generated during Expand().
-    llama::TUniqueId request_id_;
-
-    /// True if this is an expansion request, false if it is a reservation 
request
-    bool is_expansion_;
-
-    /// Set if the request was cancelled.
-    bool is_cancelled_;
-  };
-
-  /// Protects pending_requests_ and pending_expansion_ids_
-  boost::mutex pending_requests_lock_;
-
-  /// Map from unique request ID provided to Llama (for both reservation and 
expansion
-  /// requests) to PendingRequest object used to coordinate when a response is 
received
-  /// from Llama.
-  typedef boost::unordered_map<llama::TUniqueId, PendingRequest*> 
PendingRequestMap;
-  PendingRequestMap pending_requests_;
-
-  /// Map from reservation IDs to pending expansion IDs. All pending request 
IDs have a
-  /// PendingRequest in pending_requests_.
-  typedef boost::unordered_map<llama::TUniqueId, 
boost::unordered_set<llama::TUniqueId>>
-      PendingExpansionIdsMap;
-  PendingExpansionIdsMap pending_expansion_ids_;
-
-  /// An AllocatedRequest tracks resources allocated in response to one 
reservation or
-  /// expansion request.
-  class AllocatedRequest {
-   public:
-    AllocatedRequest(const llama::TUniqueId& reservation_id,
-        uint64_t memory_mb, uint32_t vcpus, bool is_expansion)
-        : reservation_id_(reservation_id), memory_mb_(memory_mb), 
vcpus_(vcpus),
-          is_expansion_(is_expansion) { }
-
-    const llama::TUniqueId reservation_id() const { return reservation_id_; }
-    uint64_t memory_mb() const { return memory_mb_; }
-    uint32_t vcpus() const { return vcpus_; }
-    bool is_expansion() const { return is_expansion_; }
-
-   private:
-    /// The reservation ID for this request. Expansions all share the same 
reservation ID.
-    llama::TUniqueId reservation_id_;
-
-    /// The total memory allocated to this request
-    uint64_t memory_mb_;
-
-    /// The number of VCPUs allocated to this request
-    uint32_t vcpus_;
-
-    /// True if this is an expansion request, false if it is a reservation 
request
-    bool is_expansion_;
-  };
-
-  /// Protectes allocated_requests_
-  boost::mutex allocated_requests_lock_;
-
-  /// Map from reservation ID to all satisfied requests - reservation and 
expansion -
-  /// associated with that reservation. Used only for bookkeeping so that 
Impala can report
-  /// on the current resource usage.
-  typedef boost::unordered_map<llama::TUniqueId, std::vector<AllocatedRequest>>
-      AllocatedRequestMap;
-  AllocatedRequestMap allocated_requests_;
-
-  /// Protects query_resource_mgrs_
-  boost::mutex query_resource_mgrs_lock_;
-  typedef boost::unordered_map<TUniqueId, std::pair<int32_t, 
QueryResourceMgr*>>
-      QueryResourceMgrsMap;
-
-  /// Map from query ID to a (ref_count, QueryResourceMgr*) pair, i.e. one 
QueryResourceMgr
-  /// per query. The refererence count is always non-zero - once it hits zero 
the entry in
-  /// the map is removed and the QueryResourceMgr is deleted.
-  QueryResourceMgrsMap query_resource_mgrs_;
-};
-
-std::ostream& operator<<(std::ostream& os,
-    const TResourceBrokerReservationRequest& request);
-
-std::ostream& operator<<(std::ostream& os,
-    const TResourceBrokerReservationResponse& reservation);
-
-std::ostream& operator<<(std::ostream& os,
-    const TResourceBrokerExpansionRequest& request);
-
-std::ostream& operator<<(std::ostream& os,
-    const TResourceBrokerExpansionResponse& expansion);
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc 
b/be/src/runtime/buffered-block-mgr-test.cc
index 5b5ee8a..169baf2 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -553,8 +553,8 @@ class BufferedBlockMgrTest : public ::testing::Test {
     const int num_threads = 8;
     thread_group workers;
     // Create a shared RuntimeState with no BufferedBlockMgr.
-    RuntimeState* shared_state = new RuntimeState(TExecPlanFragmentParams(), 
"",
-        test_env_->exec_env());
+    RuntimeState* shared_state =
+        new RuntimeState(TExecPlanFragmentParams(), test_env_->exec_env());
     for (int i = 0; i < num_threads; ++i) {
       thread* t = new thread(bind(
           &BufferedBlockMgrTest::CreateDestroyThread, this, shared_state));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc 
b/be/src/runtime/buffered-block-mgr.cc
index 90c1041..d582c31 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -1317,8 +1317,8 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, 
RuntimeProfile* parent_profile,
   integrity_check_timer_ = ADD_TIMER(profile_.get(), 
"TotalIntegrityCheckTime");
 
   // Create a new mem_tracker and allocate buffers.
-  mem_tracker_.reset(new MemTracker(
-      profile(), mem_limit, -1, "Block Manager", parent_tracker));
+  mem_tracker_.reset(
+      new MemTracker(profile(), mem_limit, "Block Manager", parent_tracker));
 
   initialized_ = true;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/collection-value-builder-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/collection-value-builder-test.cc 
b/be/src/runtime/collection-value-builder-test.cc
index c7843b7..b8f4b65 100644
--- a/be/src/runtime/collection-value-builder-test.cc
+++ b/be/src/runtime/collection-value-builder-test.cc
@@ -40,7 +40,7 @@ TEST(CollectionValueBuilderTest, MaxBufferSize) {
   CollectionValue coll_value;
   int64_t initial_capacity = (INT_MAX / 8) + 1;
   int64_t mem_limit = initial_capacity * 4 * 4;
-  MemTracker tracker(mem_limit, mem_limit);
+  MemTracker tracker(mem_limit);
   MemPool pool(&tracker);
   CollectionValueBuilder coll_value_builder(
       &coll_value, tuple_desc, &pool, NULL, initial_capacity);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 4728866..e2dd1a4 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -62,7 +62,6 @@
 #include "util/error-util.h"
 #include "util/hdfs-bulk-ops.h"
 #include "util/hdfs-util.h"
-#include "util/llama-util.h"
 #include "util/network-util.h"
 #include "util/pretty-printer.h"
 #include "util/summary-util.h"
@@ -486,7 +485,7 @@ Status Coordinator::Exec(QuerySchedule& schedule,
         runtime_state()->obj_pool(), request.fragments[0].output_exprs,
         output_expr_ctxs));
     MemTracker* output_expr_tracker = runtime_state()->obj_pool()->Add(new 
MemTracker(
-        -1, -1, "Output exprs", runtime_state()->instance_mem_tracker(), 
false));
+        -1, "Output exprs", runtime_state()->instance_mem_tracker(), false));
     RETURN_IF_ERROR(Expr::Prepare(
         *output_expr_ctxs, runtime_state(), row_desc(), output_expr_tracker));
   } else {
@@ -503,12 +502,12 @@ Status Coordinator::Exec(QuerySchedule& schedule,
     MemTracker* pool_tracker = MemTracker::GetRequestPoolMemTracker(
         schedule.request_pool(), exec_env_->process_mem_tracker());
     query_mem_tracker_ =
-        MemTracker::GetQueryMemTracker(query_id_, query_limit, -1, 
pool_tracker, NULL);
+        MemTracker::GetQueryMemTracker(query_id_, query_limit, pool_tracker);
 
     executor_.reset(NULL);
   }
-  filter_mem_tracker_.reset(new MemTracker(-1, -1, "Runtime Filter 
(Coordinator)",
-      query_mem_tracker(), false));
+  filter_mem_tracker_.reset(
+      new MemTracker(-1, "Runtime Filter (Coordinator)", query_mem_tracker(), 
false));
 
   // Initialize the execution profile structures.
   InitExecProfile(request);
@@ -1900,20 +1899,6 @@ void 
Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule,
   SetExecPlanDescriptorTable(fragment, rpc_params);
 
   TNetworkAddress exec_host = params.hosts[fragment_instance_idx];
-  if (schedule.HasReservation()) {
-    // The reservation has already have been validated at this point.
-    TNetworkAddress resource_hostport;
-    schedule.GetResourceHostport(exec_host, &resource_hostport);
-    map<TNetworkAddress, llama::TAllocatedResource>::const_iterator it =
-        schedule.reservation()->allocated_resources.find(resource_hostport);
-    // Only set reserved resource if we actually have one for this plan
-    // fragment. Otherwise, don't set it (usually this the coordinator 
fragment), and it
-    // won't participate in dynamic RM controls.
-    if (it != schedule.reservation()->allocated_resources.end()) {
-      fragment_instance_ctx.__set_reserved_resource(it->second);
-      fragment_instance_ctx.__set_local_resource_address(resource_hostport);
-    }
-  }
   FragmentScanRangeAssignment::const_iterator it =
       params.scan_range_assignment.find(exec_host);
   // Scan ranges may not always be set, so use an empty structure if so.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc 
b/be/src/runtime/data-stream-recvr.cc
index 9bd51af..50103ba 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -295,7 +295,7 @@ DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, 
MemTracker* parent_t
     is_merging_(is_merging),
     num_buffered_bytes_(0),
     profile_(profile) {
-  mem_tracker_.reset(new MemTracker(-1, -1, "DataStreamRecvr", 
parent_tracker));
+  mem_tracker_.reset(new MemTracker(-1, "DataStreamRecvr", parent_tracker));
   // Create one queue per sender if is_merging is true.
   int num_queues = is_merging ? num_senders : 1;
   sender_queues_.reserve(num_queues);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index dae4724..f5eb783 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -112,9 +112,7 @@ class ImpalaTestBackend : public ImpalaInternalServiceIf {
 
 class DataStreamTest : public testing::Test {
  protected:
-  DataStreamTest()
-    : runtime_state_(TExecPlanFragmentParams(), "", &exec_env_),
-      next_val_(0) {
+  DataStreamTest() : runtime_state_(TExecPlanFragmentParams(), &exec_env_), 
next_val_(0) {
     // Initialize Mem trackers for use by the data stream receiver.
     exec_env_.InitForFeTests();
     runtime_state_.InitMemTrackers(TUniqueId(), NULL, -1);
@@ -482,7 +480,7 @@ class DataStreamTest : public testing::Test {
 
   void Sender(int sender_num, int channel_buffer_size,
               TPartitionType::type partition_type) {
-    RuntimeState state(TExecPlanFragmentParams(), "", &exec_env_);
+    RuntimeState state(TExecPlanFragmentParams(), &exec_env_);
     state.set_desc_tbl(desc_tbl_);
     state.InitMemTrackers(TUniqueId(), NULL, -1);
     VLOG_QUERY << "create sender " << sender_num;
@@ -596,7 +594,7 @@ TEST_F(DataStreamTest, BasicTest) {
 // TODO: Make lifecycle requirements more explicit.
 TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
   scoped_ptr<RuntimeState> runtime_state(
-      new RuntimeState(TExecPlanFragmentParams(), "", &exec_env_));
+      new RuntimeState(TExecPlanFragmentParams(), &exec_env_));
   runtime_state->InitMemTrackers(TUniqueId(), NULL, -1);
 
   scoped_ptr<RuntimeProfile> profile(new RuntimeProfile(&obj_pool_, 
"TestReceiver"));


Reply via email to