http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
new file mode 100644
index 0000000..fe9cca2
--- /dev/null
+++ b/be/src/runtime/query-state.h
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_QUERY_STATE_H
+#define IMPALA_RUNTIME_QUERY_STATE_H
+
+#include <boost/thread/mutex.hpp>
+#include <unordered_map>
+
+#include "common/object-pool.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/Types_types.h"
+#include "util/uid-util.h"
+#include "common/atomic.h"
+
+namespace impala {
+
+class FragmentInstanceState;
+
+/// Central class for all backend execution state (example: the 
FragmentInstanceStates
+/// of the individual fragment instances) created for a particular query.
+/// This class contains or makes accessible state that is shared across 
fragment
+/// instances; in contrast, fragment instance-specific state is collected in
+/// FragmentInstanceState.
+///
+/// The lifetime of an instance of this class is dictated by a reference count.
+/// Any thread that executes on behalf of a query, and accesses any of its 
state,
+/// must obtain a reference to the corresponding QueryState and hold it for at 
least the
+/// duration of that access. The reference is obtained and released via
+/// QueryExecMgr::Get-/ReleaseQueryState() or via QueryState::ScopedRef (the 
latter
+/// for references limited to the scope of a single function or block).
+/// As long as the reference count is greater than 0, all query state 
(contained
+/// either in this class or accessible through this class, such as the
+/// FragmentInstanceStates) is guaranteed to be alive.
+///
+/// Thread-safe, unless noted otherwise.
+class QueryState {
+ public:
+  /// Use this class to obtain a QueryState for the duration of a 
function/block,
+  /// rather than manually via QueryExecMgr::Get-/ReleaseQueryState().
+  /// Pattern:
+  /// {
+  ///   QueryState::ScopedRef qs(qid);
+  ///   if (qs->query_state() == nullptr) <do something, such as return>
+  ///   ...
+  /// }
+  class ScopedRef {
+   public:
+    ScopedRef(const TUniqueId& query_id);
+    ~ScopedRef();
+
+    /// may return nullptr
+    QueryState* get() const { return query_state_; }
+    QueryState* operator->() const { return query_state_; }
+
+   private:
+    QueryState* query_state_;
+    DISALLOW_COPY_AND_ASSIGN(ScopedRef);
+  };
+
+  /// a shared pool for all objects that have query lifetime
+  ObjectPool* obj_pool() { return &obj_pool_; }
+
+  /// This TQueryCtx was copied from the first fragment instance which led to 
the
+  /// creation of this QueryState. For all subsequently arriving fragment 
instances the
+  /// desc_tbl in this context will be incorrect, therefore 
query_ctx().desc_tbl should
+  /// not be used. This restriction will go away with the switch to a 
per-query exec
+  /// rpc.
+  const TQueryCtx& query_ctx() const { return query_ctx_; }
+
+  const TUniqueId& query_id() const { return query_ctx_.query_id; }
+
+  /// Registers a new FInstanceState.
+  void RegisterFInstance(FragmentInstanceState* fis);
+
+  /// Returns the instance state or nullptr if the instance id has not 
previously
+  /// been registered. The returned FIS is valid for the duration of the 
QueryState.
+  FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id);
+
+ private:
+  friend class QueryExecMgr;
+  friend class TestEnv;
+
+  static const int DEFAULT_BATCH_SIZE = 1024;
+
+  TQueryCtx query_ctx_;
+
+  ObjectPool obj_pool_;
+  AtomicInt32 refcnt_;
+
+  boost::mutex fis_map_lock_;  // protects fis_map_
+
+  /// map from instance id to its state (owned by obj_pool_)
+  std::unordered_map<TUniqueId, FragmentInstanceState*> fis_map_;
+
+  /// Create QueryState w/ copy of query_ctx and refcnt of 0.
+  QueryState(const TQueryCtx& query_ctx);
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc 
b/be/src/runtime/runtime-filter-bank.cc
index 53755f9..759b505 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -45,12 +45,12 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& 
query_ctx, RuntimeState* s
       state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
 
   // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE
-  max_filter_size_ = query_ctx.request.query_options.runtime_filter_max_size;
+  max_filter_size_ = 
query_ctx.client_request.query_options.runtime_filter_max_size;
   max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE);
   max_filter_size_ =
       BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, 
MAX_BLOOM_FILTER_SIZE));
 
-  min_filter_size_ = query_ctx.request.query_options.runtime_filter_min_size;
+  min_filter_size_ = 
query_ctx.client_request.query_options.runtime_filter_min_size;
   min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE);
   min_filter_size_ =
       BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, 
MAX_BLOOM_FILTER_SIZE));
@@ -61,7 +61,7 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& 
query_ctx, RuntimeState* s
   DCHECK_GT(min_filter_size_, 0);
   DCHECK_GT(max_filter_size_, 0);
 
-  default_filter_size_ = 
query_ctx.request.query_options.runtime_bloom_filter_size;
+  default_filter_size_ = 
query_ctx.client_request.query_options.runtime_bloom_filter_size;
   default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
   default_filter_size_ =
       BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, 
max_filter_size_));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index a0c3335..5be4f1c 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -32,12 +32,14 @@
 #include "exprs/expr.h"
 #include "exprs/scalar-fn-call.h"
 #include "runtime/buffered-block-mgr.h"
+#include "runtime/exec-env.h"
 #include "runtime/descriptors.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/data-stream-recvr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/runtime-filter-bank.h"
 #include "runtime/timestamp-value.h"
+#include "runtime/query-state.h"
 #include "util/bitmap.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
@@ -46,6 +48,7 @@
 #include "util/jni-util.h"
 #include "util/mem-info.h"
 #include "util/pretty-printer.h"
+#include "util/auth-util.h" // for GetEffectiveUser()
 
 #include "common/names.h"
 
@@ -69,28 +72,35 @@ static const int64_t BLOCK_MGR_MEM_MIN_REMAINING = 100 * 
1024 * 1024;
 namespace impala {
 
 RuntimeState::RuntimeState(
-    const TExecPlanFragmentParams& fragment_params, ExecEnv* exec_env)
-  : obj_pool_(new ObjectPool()),
-    fragment_params_(fragment_params),
+    QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env)
+  : desc_tbl_(nullptr),
+    obj_pool_(new ObjectPool()),
+    query_state_(query_state),
+    fragment_ctx_(&fragment_ctx),
+    instance_ctx_(&instance_ctx),
     now_(new TimestampValue(
-        query_ctx().now_string.c_str(), query_ctx().now_string.size())),
-    profile_(obj_pool_.get(), "Fragment " + 
PrintId(fragment_ctx().fragment_instance_id)),
+        query_state->query_ctx().now_string.c_str(),
+        query_state->query_ctx().now_string.size())),
+    exec_env_(exec_env),
+    profile_(obj_pool_.get(), "Fragment " + 
PrintId(instance_ctx.fragment_instance_id)),
     is_cancelled_(false),
     root_node_id_(-1) {
-  Status status = Init(exec_env);
-  DCHECK(status.ok()) << status.GetDetail();
+  Init();
 }
 
-RuntimeState::RuntimeState(const TQueryCtx& query_ctx)
+RuntimeState::RuntimeState(const TQueryCtx& query_ctx, ExecEnv* exec_env)
   : obj_pool_(new ObjectPool()),
-    now_(new TimestampValue(query_ctx.now_string.c_str(),
-        query_ctx.now_string.size())),
-    exec_env_(ExecEnv::GetInstance()),
+    query_state_(nullptr),
+    fragment_ctx_(nullptr),
+    instance_ctx_(nullptr),
+    local_query_ctx_(query_ctx),
+    now_(new TimestampValue(query_ctx.now_string.c_str(), 
query_ctx.now_string.size())),
+    exec_env_(exec_env == nullptr ? ExecEnv::GetInstance() : exec_env),
     profile_(obj_pool_.get(), "<unnamed>"),
     is_cancelled_(false),
     root_node_id_(-1) {
-  fragment_params_.__set_query_ctx(query_ctx);
-  
fragment_params_.query_ctx.request.query_options.__set_batch_size(DEFAULT_BATCH_SIZE);
+  Init();
 }
 
 RuntimeState::~RuntimeState() {
@@ -110,27 +120,12 @@ RuntimeState::~RuntimeState() {
   query_mem_tracker_.reset();
 }
 
-Status RuntimeState::Init(ExecEnv* exec_env) {
+void RuntimeState::Init() {
   SCOPED_TIMER(profile_.total_time_counter());
-  exec_env_ = exec_env;
-  TQueryOptions& query_options =
-      fragment_params_.query_ctx.request.query_options;
-
-  // max_errors does not indicate how many errors in total have been recorded, 
but rather
-  // how many are distinct. It is defined as the sum of the number of generic 
errors and
-  // the number of distinct other errors.
-  if (query_options.max_errors <= 0) {
-    // TODO: fix linker error and uncomment this
-    //query_options_.max_errors = FLAGS_max_errors;
-    query_options.max_errors = 100;
-  }
-  if (query_options.batch_size <= 0) {
-    query_options.__set_batch_size(DEFAULT_BATCH_SIZE);
-  }
 
   // Register with the thread mgr
-  if (exec_env != NULL) {
-    resource_pool_ = exec_env->thread_mgr()->RegisterPool();
+  if (exec_env_ != NULL) {
+    resource_pool_ = exec_env_->thread_mgr()->RegisterPool();
     DCHECK(resource_pool_ != NULL);
   }
 
@@ -138,19 +133,16 @@ Status RuntimeState::Init(ExecEnv* exec_env) {
   total_storage_wait_timer_ = ADD_TIMER(runtime_profile(), 
"TotalStorageWaitTime");
   total_network_send_timer_ = ADD_TIMER(runtime_profile(), 
"TotalNetworkSendTime");
   total_network_receive_timer_ = ADD_TIMER(runtime_profile(), 
"TotalNetworkReceiveTime");
-
-  return Status::OK();
 }
 
-void RuntimeState::InitMemTrackers(
-    const TUniqueId& query_id, const string* pool_name, int64_t 
query_bytes_limit) {
+void RuntimeState::InitMemTrackers(const string* pool_name, int64_t 
query_bytes_limit) {
   MemTracker* query_parent_tracker = exec_env_->process_mem_tracker();
   if (pool_name != NULL) {
     query_parent_tracker = MemTracker::GetRequestPoolMemTracker(*pool_name,
         query_parent_tracker);
   }
   query_mem_tracker_ =
-      MemTracker::GetQueryMemTracker(query_id, query_bytes_limit, 
query_parent_tracker);
+      MemTracker::GetQueryMemTracker(query_id(), query_bytes_limit, 
query_parent_tracker);
   instance_mem_tracker_.reset(new MemTracker(
       runtime_profile(), -1, runtime_profile()->name(), 
query_mem_tracker_.get()));
 }
@@ -314,4 +306,39 @@ void RuntimeState::ReleaseResources() {
     exec_env_->thread_mgr()->UnregisterPool(resource_pool_);
   }
 }
+
+const std::string& RuntimeState::GetEffectiveUser() const {
+  return impala::GetEffectiveUser(query_ctx().session);
+}
+
+ImpalaBackendClientCache* RuntimeState::impalad_client_cache() {
+  return exec_env_->impalad_client_cache();
+}
+
+CatalogServiceClientCache* RuntimeState::catalogd_client_cache() {
+  return exec_env_->catalogd_client_cache();
+}
+
+DiskIoMgr* RuntimeState::io_mgr() {
+  return exec_env_->disk_io_mgr();
+}
+
+DataStreamMgr* RuntimeState::stream_mgr() {
+  return exec_env_->stream_mgr();
+}
+
+HBaseTableFactory* RuntimeState::htable_factory() {
+  return exec_env_->htable_factory();
+}
+
+const TQueryCtx& RuntimeState::query_ctx() const {
+  return query_state_ != nullptr ? query_state_->query_ctx() : 
local_query_ctx_;
+}
+
+const TQueryOptions& RuntimeState::query_options() const {
+  const TQueryCtx& query_ctx =
+      query_state_ != nullptr ? query_state_->query_ctx() : local_query_ctx_;
+  return query_ctx.client_request.query_options;
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index de3ab94..b56097a 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -26,16 +26,16 @@
 // NOTE: try not to add more headers here: runtime-state.h is included in many 
many files.
 #include "common/global-types.h"  // for PlanNodeId
 #include "runtime/client-cache-types.h"
-#include "runtime/exec-env.h"
 #include "runtime/thread-resource-mgr.h"
-#include "util/auth-util.h" // for GetEffectiveUser()
 #include "util/runtime-profile.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
 
 namespace impala {
 
 class BufferedBlockMgr;
 class DataStreamRecvr;
 class DescriptorTbl;
+class DiskIoMgr;
 class DiskIoRequestContext;
 class Expr;
 class LlvmCodeGen;
@@ -45,8 +45,15 @@ class RuntimeFilterBank;
 class ScalarFnCall;
 class Status;
 class TimestampValue;
-class TQueryOptions;
 class TUniqueId;
+class ExecEnv;
+class DataStreamMgr;
+class HBaseTableFactory;
+class TPlanFragmentCtx;
+class TPlanFragmentInstanceCtx;
+class QueryState;
+
+/// TODO: move the typedefs into a separate .h (and fix the includes for that)
 
 /// Counts how many rows an INSERT query has added to a particular partition
 /// (partitions are identified by their partition keys: k1=v1/k2=v2
@@ -68,10 +75,13 @@ typedef std::map<std::string, std::string> FileMoveMap;
 /// destruction.
 class RuntimeState {
  public:
-  RuntimeState(const TExecPlanFragmentParams& fragment_params, ExecEnv* 
exec_env);
+  /// query_state, fragment_ctx, and instance_ctx need to be alive at least as 
long as
+  /// the constructed RuntimeState
+  RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env);
 
   /// RuntimeState for executing expr in fe-support.
-  RuntimeState(const TQueryCtx& query_ctx);
+  RuntimeState(const TQueryCtx& query_ctx, ExecEnv* exec_env = nullptr);
 
   /// Empty d'tor to avoid issues with scoped_ptr.
   ~RuntimeState();
@@ -82,8 +92,7 @@ class RuntimeState {
   /// when they are initialized. This function also initializes a user 
function mem
   /// tracker (in the fifth level). If 'request_pool' is null, no request pool 
mem
   /// tracker is set up, i.e. query pools will have the process mem pool as 
the parent.
-  void InitMemTrackers(const TUniqueId& query_id, const std::string* 
request_pool,
-      int64_t query_bytes_limit);
+  void InitMemTrackers(const std::string* request_pool, int64_t 
query_bytes_limit);
 
   /// Initializes the runtime filter bank. Must be called after 
InitMemTrackers().
   void InitFilterBank();
@@ -91,30 +100,19 @@ class RuntimeState {
   /// Gets/Creates the query wide block mgr.
   Status CreateBlockMgr();
 
+  QueryState* query_state() const { return query_state_; }
   ObjectPool* obj_pool() const { return obj_pool_.get(); }
   const DescriptorTbl& desc_tbl() const { return *desc_tbl_; }
   void set_desc_tbl(DescriptorTbl* desc_tbl) { desc_tbl_ = desc_tbl; }
-  const TQueryOptions& query_options() const {
-    return query_ctx().request.query_options;
-  }
-  int batch_size() const { return 
query_ctx().request.query_options.batch_size; }
-  bool abort_on_error() const {
-    return query_ctx().request.query_options.abort_on_error;
-  }
-  bool strict_mode() const {
-    return query_ctx().request.query_options.strict_mode;
-  }
+  const TQueryOptions& query_options() const;
+  int batch_size() const { return query_options().batch_size; }
+  bool abort_on_error() const { return query_options().abort_on_error; }
+  bool strict_mode() const { return query_options().strict_mode; }
   bool abort_on_default_limit_exceeded() const {
-    return query_ctx().request.query_options.abort_on_default_limit_exceeded;
-  }
-  const TQueryCtx& query_ctx() const { return fragment_params_.query_ctx; }
-  const TPlanFragmentInstanceCtx& fragment_ctx() const {
-    return fragment_params_.fragment_instance_ctx;
-  }
-  const TExecPlanFragmentParams& fragment_params() const { return 
fragment_params_; }
-  const std::string& effective_user() const {
-    return GetEffectiveUser(query_ctx().session);
+    return query_options().abort_on_default_limit_exceeded;
   }
+  const TQueryCtx& query_ctx() const;
+  const TPlanFragmentInstanceCtx& instance_ctx() const { return 
*instance_ctx_; }
   const TUniqueId& session_id() const { return query_ctx().session.session_id; 
}
   const std::string& do_as_user() const { return 
query_ctx().session.delegated_user; }
   const std::string& connected_user() const {
@@ -124,18 +122,16 @@ class RuntimeState {
   void set_now(const TimestampValue* now);
   const TUniqueId& query_id() const { return query_ctx().query_id; }
   const TUniqueId& fragment_instance_id() const {
-    return fragment_ctx().fragment_instance_id;
+    return instance_ctx_ != nullptr
+        ? instance_ctx_->fragment_instance_id
+        : no_instance_id_;
   }
   ExecEnv* exec_env() { return exec_env_; }
-  DataStreamMgr* stream_mgr() { return exec_env_->stream_mgr(); }
-  HBaseTableFactory* htable_factory() { return exec_env_->htable_factory(); }
-  ImpalaBackendClientCache* impalad_client_cache() {
-    return exec_env_->impalad_client_cache();
-  }
-  CatalogServiceClientCache* catalogd_client_cache() {
-    return exec_env_->catalogd_client_cache();
-  }
-  DiskIoMgr* io_mgr() { return exec_env_->disk_io_mgr(); }
+  DataStreamMgr* stream_mgr();
+  HBaseTableFactory* htable_factory();
+  ImpalaBackendClientCache* impalad_client_cache();
+  CatalogServiceClientCache* catalogd_client_cache();
+  DiskIoMgr* io_mgr();
   MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); }
   MemTracker* query_mem_tracker() { return query_mem_tracker_.get(); }
   ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
@@ -161,6 +157,8 @@ class RuntimeState {
   /// Returns the LlvmCodeGen object for this fragment instance.
   LlvmCodeGen* codegen() { return codegen_.get(); }
 
+  const std::string& GetEffectiveUser() const;
+
   /// Add ScalarFnCall expression 'udf' to be codegen'd later if it's not 
disabled by
   /// query option. This is for cases in which the UDF cannot be interpreted 
or if the
   /// plan fragment doesn't contain any codegen enabled operator.
@@ -321,15 +319,13 @@ class RuntimeState {
   friend class TestEnv;
 
   /// Set per-fragment state.
-  Status Init(ExecEnv* exec_env);
+  void Init();
 
   /// Use a custom block manager for the query for testing purposes.
   void set_block_mgr(const std::shared_ptr<BufferedBlockMgr>& block_mgr) {
     block_mgr_ = block_mgr;
   }
 
-  static const int DEFAULT_BATCH_SIZE = 1024;
-
   DescriptorTbl* desc_tbl_ = nullptr;
   boost::scoped_ptr<ObjectPool> obj_pool_;
 
@@ -339,15 +335,23 @@ class RuntimeState {
   /// Logs error messages.
   ErrorLogMap error_log_;
 
-  /// Original thrift descriptor for this fragment. Includes its unique id, 
the total
-  /// number of fragment instances, the query context, the coordinator 
address, the
-  /// descriptor table, etc.
-  TExecPlanFragmentParams fragment_params_;
+  /// Global QueryState and original thrift descriptors for this fragment 
instance.
+  /// Not set by the (const TQueryCtx&) c'tor.
+  QueryState* const query_state_;
+  const TPlanFragmentCtx* const fragment_ctx_;
+  const TPlanFragmentInstanceCtx* const instance_ctx_;
+
+  /// Provides query ctx if query_state_ == nullptr.
+  TQueryCtx local_query_ctx_;
+
+  /// Provides instance id if instance_ctx_ == nullptr
+  TUniqueId no_instance_id_;
 
   /// Query-global timestamp, e.g., for implementing now(). Set from 
query_globals_.
   /// Use pointer to avoid inclusion of timestampvalue.h and avoid clang 
issues.
   boost::scoped_ptr<TimestampValue> now_;
 
+  /// TODO: get rid of this and use ExecEnv::GetInstance() instead
   ExecEnv* exec_env_;
   boost::scoped_ptr<LlvmCodeGen> codegen_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 5f98d13..8675ae9 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -65,37 +65,11 @@ TestEnv::~TestEnv() {
   metrics_.reset();
 }
 
-Status TestEnv::CreatePerQueryState(int64_t query_id, int max_buffers, int 
block_size,
-    RuntimeState** runtime_state, TQueryOptions* query_options) {
-  // Enforce invariant that each query ID can be registered at most once.
-  if (runtime_states_.find(query_id) != runtime_states_.end()) {
-    return Status(Substitute("Duplicate query id found: $0", query_id));
-  }
-
-  TExecPlanFragmentParams plan_params = TExecPlanFragmentParams();
-  if (query_options != NULL) plan_params.query_ctx.request.query_options = 
*query_options;
-  plan_params.query_ctx.query_id.hi = 0;
-  plan_params.query_ctx.query_id.lo = query_id;
-
-  *runtime_state = new RuntimeState(plan_params, exec_env_.get());
-  (*runtime_state)->InitMemTrackers(TUniqueId(), NULL, -1);
-
-  shared_ptr<BufferedBlockMgr> mgr;
-  RETURN_IF_ERROR(BufferedBlockMgr::Create(*runtime_state,
-      (*runtime_state)->query_mem_tracker(), 
(*runtime_state)->runtime_profile(),
-      tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), 
block_size, &mgr));
-  (*runtime_state)->set_block_mgr(mgr);
-
-  runtime_states_[query_id] = shared_ptr<RuntimeState>(*runtime_state);
-  return Status::OK();
-}
-
 void TestEnv::TearDownRuntimeStates() {
   for (auto& runtime_state : runtime_states_) 
runtime_state.second->ReleaseResources();
   runtime_states_.clear();
 }
 
-
 int64_t TestEnv::CalculateMemLimit(int max_buffers, int block_size) {
   DCHECK_GE(max_buffers, -1);
   if (max_buffers == -1) return -1;
@@ -109,4 +83,29 @@ int64_t TestEnv::TotalQueryMemoryConsumption() {
   }
   return total;
 }
+
+Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int 
block_size,
+    const TQueryOptions* query_options, RuntimeState** runtime_state) {
+  // Enforce invariant that each query ID can be registered at most once.
+  if (runtime_states_.find(query_id) != runtime_states_.end()) {
+    return Status(Substitute("Duplicate query id found: $0", query_id));
+  }
+
+  TQueryCtx query_ctx;
+  if (query_options != nullptr) query_ctx.client_request.query_options = 
*query_options;
+  query_ctx.query_id.hi = 0;
+  query_ctx.query_id.lo = query_id;
+  *runtime_state = new RuntimeState(query_ctx, exec_env_.get());
+  (*runtime_state)->InitMemTrackers(nullptr, -1);
+
+  shared_ptr<BufferedBlockMgr> mgr;
+  RETURN_IF_ERROR(BufferedBlockMgr::Create(*runtime_state,
+      (*runtime_state)->query_mem_tracker(), 
(*runtime_state)->runtime_profile(),
+      tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), 
block_size, &mgr));
+  (*runtime_state)->set_block_mgr(mgr);
+
+  runtime_states_[query_id] = shared_ptr<RuntimeState>(*runtime_state);
+  return Status::OK();
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/test-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index 6399110..e648977 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -23,6 +23,7 @@
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
+#include "runtime/query-state.h"
 
 namespace impala {
 
@@ -39,9 +40,9 @@ class TestEnv {
 
   /// Create a RuntimeState for a query with a new block manager and the given 
query
   /// options. The RuntimeState is owned by the TestEnv. Returns an error if
-  /// CreatePerQueryState() has been called with the same query ID already.
-  Status CreatePerQueryState(int64_t query_id, int max_buffers, int block_size,
-      RuntimeState** runtime_state, TQueryOptions* query_options = NULL);
+  /// CreateQueryState() has been called with the same query ID already.
+  Status CreateQueryState(int64_t query_id, int max_buffers, int block_size,
+      const TQueryOptions* query_options, RuntimeState** runtime_state);
 
   /// Destroy all RuntimeStates and block managers created by this TestEnv.
   void TearDownRuntimeStates();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/thread-resource-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/thread-resource-mgr.h 
b/be/src/runtime/thread-resource-mgr.h
index 612e733..8b86dcc 100644
--- a/be/src/runtime/thread-resource-mgr.h
+++ b/be/src/runtime/thread-resource-mgr.h
@@ -60,6 +60,8 @@ namespace impala {
 ///  - Priorities for different pools
 /// If both the mgr and pool locks need to be taken, the mgr lock must
 /// be taken first.
+///
+/// TODO: make ResourcePool a stand-alone class
 class ThreadResourceMgr {
  public:
   class ResourcePool;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc 
b/be/src/scheduling/query-schedule.cc
index e2dc7c4..668c9c8 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -78,7 +78,7 @@ void QuerySchedule::Init() {
   if (request_.stmt_type == TStmtType::QUERY) {
     fragment_exec_params_[root_fragment.idx].is_coord_fragment = true;
     // the coordinator instance gets index 0, generated instance ids start at 1
-    next_instance_id_.lo = 1;
+    next_instance_id_ = CreateInstanceId(next_instance_id_, 1);
   }
 
   // find max node id

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/scheduling/request-pool-service.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/request-pool-service.cc 
b/be/src/scheduling/request-pool-service.cc
index 9bddde9..b674bf0 100644
--- a/be/src/scheduling/request-pool-service.cc
+++ b/be/src/scheduling/request-pool-service.cc
@@ -160,7 +160,7 @@ Status RequestPoolService::ResolveRequestPool(const 
TQueryCtx& ctx,
     user = DEFAULT_USER;
   }
 
-  const string& requested_pool = ctx.request.query_options.request_pool;
+  const string& requested_pool = ctx.client_request.query_options.request_pool;
   TResolveRequestPoolParams params;
   params.__set_user(user);
   params.__set_requested_pool(requested_pool);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc 
b/be/src/scheduling/simple-scheduler.cc
index ff0337c..95414e4 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -480,7 +480,8 @@ void SimpleScheduler::CreateUnionInstances(
 void SimpleScheduler::CreateScanInstances(
     PlanNodeId leftmost_scan_id, FragmentExecParams* fragment_params,
     QuerySchedule* schedule) {
-  int max_num_instances = 
schedule->request().query_ctx.request.query_options.mt_dop;
+  int max_num_instances =
+      schedule->request().query_ctx.client_request.query_options.mt_dop;
   if (max_num_instances == 0) max_num_instances = 1;
 
   if (fragment_params->scan_range_assignment.empty()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index 35130ff..6d5a897 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -24,13 +24,12 @@ set(EXECUTABLE_OUTPUT_PATH 
"${BUILD_OUTPUT_ROOT_DIRECTORY}/service")
 add_library(Service
   frontend.cc
   fe-support.cc
-  fragment-exec-state.cc
-  fragment-mgr.cc
   hs2-util.cc
   impala-server.cc
   impala-http-handler.cc
   impala-hs2-server.cc
   impala-beeswax-server.cc
+  impala-internal-service.cc
   query-exec-state.cc
   query-options.cc
   query-result-set.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 9415a76..0cc9761 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -95,7 +95,7 @@ 
Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
   query_ctx.disable_codegen_hint = true;
   // Allow logging of at least one error, so we can detect and convert it into 
a
   // Java exception.
-  query_ctx.request.query_options.max_errors = 1;
+  query_ctx.client_request.query_options.max_errors = 1;
   RuntimeState state(query_ctx);
   // Make sure to close the runtime state no matter how this scope is exited.
   const auto close_runtime_state =
@@ -106,11 +106,11 @@ 
Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
   // Exprs can allocate memory so we need to set up the mem trackers before
   // preparing/running the exprs.
   int64_t mem_limit = -1;
-  if (query_ctx.request.query_options.__isset.mem_limit
-      && query_ctx.request.query_options.mem_limit > 0) {
-    mem_limit = query_ctx.request.query_options.mem_limit;
+  if (query_ctx.client_request.query_options.__isset.mem_limit
+      && query_ctx.client_request.query_options.mem_limit > 0) {
+    mem_limit = query_ctx.client_request.query_options.mem_limit;
   }
-  state.InitMemTrackers(state.query_id(), NULL, mem_limit);
+  state.InitMemTrackers(NULL, mem_limit);
 
   // Prepare() the exprs. Always Close() the exprs even in case of errors.
   vector<ExprContext*> expr_ctxs;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/fragment-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.cc 
b/be/src/service/fragment-exec-state.cc
deleted file mode 100644
index 337be82..0000000
--- a/be/src/service/fragment-exec-state.cc
+++ /dev/null
@@ -1,145 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "service/fragment-exec-state.h"
-
-#include <sstream>
-
-#include "codegen/llvm-codegen.h"
-#include "gen-cpp/ImpalaInternalService.h"
-#include "rpc/thrift-util.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/runtime-filter-bank.h"
-#include "util/bloom-filter.h"
-#include "runtime/backend-client.h"
-
-#include "common/names.h"
-
-using namespace apache::thrift;
-using namespace strings;
-using namespace impala;
-
-Status FragmentMgr::FragmentExecState::UpdateStatus(const Status& status) {
-  lock_guard<mutex> l(status_lock_);
-  if (!status.ok() && exec_status_.ok()) exec_status_ = status;
-  return exec_status_;
-}
-
-Status FragmentMgr::FragmentExecState::Cancel() {
-  lock_guard<mutex> l(status_lock_);
-  RETURN_IF_ERROR(exec_status_);
-  executor_.Cancel();
-  return Status::OK();
-}
-
-void FragmentMgr::FragmentExecState::Exec() {
-  Status status = executor_.Prepare(exec_params_);
-  prepare_promise_.Set(status);
-  if (status.ok()) {
-    if (executor_.Open().ok()) {
-      executor_.Exec();
-    }
-  }
-  executor_.Close();
-}
-
-void FragmentMgr::FragmentExecState::ReportStatusCb(
-    const Status& status, RuntimeProfile* profile, bool done) {
-  DCHECK(status.ok() || done);  // if !status.ok() => done
-  Status exec_status = UpdateStatus(status);
-
-  Status coord_status;
-  ImpalaBackendConnection coord(client_cache_, coord_address(), &coord_status);
-  if (!coord_status.ok()) {
-    stringstream s;
-    s << "Couldn't get a client for " << coord_address() <<"\tReason: "
-      << coord_status.GetDetail();
-    UpdateStatus(Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, s.str())));
-    return;
-  }
-
-  TReportExecStatusParams params;
-  params.protocol_version = ImpalaInternalServiceVersion::V1;
-  params.__set_query_id(query_ctx_.query_id);
-  
params.__set_fragment_instance_id(fragment_instance_ctx_.fragment_instance_id);
-  exec_status.SetTStatus(&params);
-  params.__set_done(done);
-
-  if (profile != NULL) {
-    profile->ToThrift(&params.profile);
-    params.__isset.profile = true;
-  }
-
-  RuntimeState* runtime_state = executor_.runtime_state();
-  // If executor_ did not successfully prepare, runtime state may not have 
been set.
-  if (runtime_state != NULL) {
-    // Only send updates to insert status if fragment is finished, the 
coordinator
-    // waits until query execution is done to use them anyhow.
-    if (done) {
-      TInsertExecStatus insert_status;
-
-      if (runtime_state->hdfs_files_to_move()->size() > 0) {
-        
insert_status.__set_files_to_move(*runtime_state->hdfs_files_to_move());
-      }
-      if (runtime_state->per_partition_status()->size() > 0) {
-        
insert_status.__set_per_partition_status(*runtime_state->per_partition_status());
-      }
-
-      params.__set_insert_exec_status(insert_status);
-    }
-
-    // Send new errors to coordinator
-    runtime_state->GetUnreportedErrors(&(params.error_log));
-  }
-  params.__isset.error_log = (params.error_log.size() > 0);
-
-  TReportExecStatusResult res;
-  Status rpc_status;
-  bool retry_is_safe;
-  // Try to send the RPC 3 times before failing.
-  for (int i = 0; i < 3; ++i) {
-    rpc_status = coord.DoRpc(&ImpalaBackendClient::ReportExecStatus, params, 
&res,
-        &retry_is_safe);
-    if (rpc_status.ok()) {
-      rpc_status = Status(res.status);
-      break;
-    }
-    if (!retry_is_safe) break;
-    if (i < 2) SleepForMs(100);
-  }
-  if (!rpc_status.ok()) {
-    UpdateStatus(rpc_status);
-    executor_.Cancel();
-  }
-}
-
-void FragmentMgr::FragmentExecState::PublishFilter(int32_t filter_id,
-    const TBloomFilter& thrift_bloom_filter) {
-  // Defensively protect against blocking forever in case there's some problem 
with
-  // Prepare().
-  static const int WAIT_MS = 30000;
-  bool timed_out = false;
-  // Wait until Prepare() is done, so we know that the filter bank is set up.
-  Status prepare_status = prepare_promise_.Get(WAIT_MS, &timed_out);
-  if (timed_out) {
-    LOG(ERROR) << "Unexpected timeout in PublishFilter()";
-    return;
-  }
-  if (!prepare_status.ok()) return;
-  executor_.runtime_state()->filter_bank()->PublishGlobalFilter(filter_id,
-      thrift_bloom_filter);
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/fragment-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.h 
b/be/src/service/fragment-exec-state.h
deleted file mode 100644
index 1c64f2d..0000000
--- a/be/src/service/fragment-exec-state.h
+++ /dev/null
@@ -1,105 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_SERVICE_FRAGMENT_EXEC_STATE_H
-#define IMPALA_SERVICE_FRAGMENT_EXEC_STATE_H
-
-#include <boost/bind.hpp>
-#include <boost/thread/mutex.hpp>
-
-#include "common/status.h"
-#include "runtime/client-cache.h"
-#include "runtime/plan-fragment-executor.h"
-#include "service/fragment-mgr.h"
-
-namespace impala {
-
-/// Execution state of a single plan fragment.
-class FragmentMgr::FragmentExecState {
- public:
-  FragmentExecState(const TExecPlanFragmentParams& params, ExecEnv* exec_env)
-    : query_ctx_(params.query_ctx), 
fragment_instance_ctx_(params.fragment_instance_ctx),
-      executor_(exec_env, boost::bind<void>(
-          boost::mem_fn(&FragmentMgr::FragmentExecState::ReportStatusCb),
-              this, _1, _2, _3)),
-      client_cache_(exec_env->impalad_client_cache()), exec_params_(params) {
-  }
-
-  /// Calling the d'tor releases all memory and closes all data streams
-  /// held by executor_.
-  ~FragmentExecState() { }
-
-  /// Returns current execution status, if there was an error. Otherwise 
cancels
-  /// the fragment and returns OK.
-  Status Cancel();
-
-  /// Main loop of plan fragment execution. Blocks until execution finishes.
-  void Exec();
-
-  const TUniqueId& query_id() const { return query_ctx_.query_id; }
-
-  const TUniqueId& fragment_instance_id() const {
-    return fragment_instance_ctx_.fragment_instance_id;
-  }
-
-  const TNetworkAddress& coord_address() const { return 
query_ctx_.coord_address; }
-
-  /// Set the execution thread, taking ownership of the object.
-  void set_exec_thread(Thread* exec_thread) { exec_thread_.reset(exec_thread); 
}
-
-  /// Publishes filter with ID 'filter_id' to this fragment's filter bank.
-  void PublishFilter(int32_t filter_id, const TBloomFilter& 
thrift_bloom_filter);
-
-  PlanFragmentExecutor* executor() { return &executor_; }
-
- private:
-  TQueryCtx query_ctx_;
-  TPlanFragmentInstanceCtx fragment_instance_ctx_;
-  PlanFragmentExecutor executor_;
-  ImpalaBackendClientCache* client_cache_;
-  TExecPlanFragmentParams exec_params_;
-
-  /// the thread executing this plan fragment
-  boost::scoped_ptr<Thread> exec_thread_;
-
-  /// protects exec_status_
-  boost::mutex status_lock_;
-
-  /// set in ReportStatusCb();
-  /// if set to anything other than OK, execution has terminated w/ an error
-  Status exec_status_;
-
-  /// Barrier for the completion of executor_.Prepare().
-  Promise<Status> prepare_promise_;
-
-  /// Update 'exec_status_' w/ 'status', if the former is not already an error.
-  /// Returns the value of 'exec_status_' after this method completes.
-  Status UpdateStatus(const Status& status);
-
-  /// Callback for executor; updates exec_status_ if 'status' indicates an 
error
-  /// or if there was a thrift error.
-  /// If not NULL, `profile` is encoded as a Thrift structure and transmitted 
as part of
-  /// the reporting RPC. `profile` may be NULL if a runtime profile has not 
been created
-  /// for this fragment (e.g. when the fragment has failed during preparation).
-  /// The executor must ensure that there is only one invocation at a time.
-  void ReportStatusCb(const Status& status, RuntimeProfile* profile, bool 
done);
-
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/fragment-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc
deleted file mode 100644
index 8e8fc05..0000000
--- a/be/src/service/fragment-mgr.cc
+++ /dev/null
@@ -1,154 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "service/fragment-mgr.h"
-
-#include <boost/lexical_cast.hpp>
-#include <gperftools/malloc_extension.h>
-#include <gutil/strings/substitute.h>
-
-#include "service/fragment-exec-state.h"
-#include "runtime/exec-env.h"
-#include "runtime/mem-tracker.h"
-#include "util/impalad-metrics.h"
-#include "util/uid-util.h"
-
-#include "common/names.h"
-
-using namespace impala;
-using namespace strings;
-
-// TODO: this logging should go into a per query log.
-DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output 
memory usage "
-    "every log_mem_usage_interval'th fragment completion.");
-
-Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& 
exec_params) {
-  VLOG_QUERY << "ExecPlanFragment() instance_id="
-             << PrintId(exec_params.fragment_instance_ctx.fragment_instance_id)
-             << " coord=" << exec_params.query_ctx.coord_address;
-
-  // Preparing and opening the fragment creates a thread and consumes a 
non-trivial
-  // amount of memory. If we are already starved for memory, cancel the 
fragment as
-  // early as possible to avoid digging the hole deeper.
-  MemTracker* process_mem_tracker = 
ExecEnv::GetInstance()->process_mem_tracker();
-  if (process_mem_tracker->LimitExceeded()) {
-    string msg = Substitute("Instance $0 of plan fragment $1 of query $2 could 
not "
-        "start because the backend Impala daemon is over its memory limit",
-        PrintId(exec_params.fragment_instance_ctx.fragment_instance_id),
-        exec_params.fragment_ctx.fragment.display_name,
-        PrintId(exec_params.query_ctx.query_id));
-    return process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
-  }
-
-  shared_ptr<FragmentExecState> exec_state(
-      new FragmentExecState(exec_params, ExecEnv::GetInstance()));
-
-  // Register exec_state before this RPC returns so that async Cancel() calls 
(which can
-  // only happen after this RPC returns) can always find this fragment.
-  {
-    lock_guard<SpinLock> l(fragment_exec_state_map_lock_);
-    DCHECK(fragment_exec_state_map_.find(exec_state->fragment_instance_id())
-        == fragment_exec_state_map_.end());
-    fragment_exec_state_map_.insert(
-        make_pair(exec_state->fragment_instance_id(), exec_state));
-  }
-
-  ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L);
-  ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L);
-
-  // Execute plan fragment in new thread.
-  // TODO: manage threads via global thread pool?
-  const TUniqueId& fragment_id = exec_state->fragment_instance_id();
-  exec_state->set_exec_thread(new Thread("fragment-mgr",
-      Substitute("exec-plan-fragment-$0", PrintId(fragment_id)),
-          &FragmentMgr::FragmentThread, this, fragment_id));
-
-  return Status::OK();
-}
-
-void FragmentMgr::FragmentThread(TUniqueId fragment_instance_id) {
-  shared_ptr<FragmentExecState> exec_state = 
GetFragmentExecState(fragment_instance_id);
-  if (exec_state.get() == NULL) return;
-  exec_state->Exec();
-
-  // We're done with this plan fragment
-  {
-    lock_guard<SpinLock> l(fragment_exec_state_map_lock_);
-    size_t num_erased =
-        fragment_exec_state_map_.erase(exec_state->fragment_instance_id());
-    DCHECK_EQ(num_erased, 1);
-  }
-  // TODO: this might be imprecise, if another client of FragmentMgr has a 
reference to
-  // the fragment exec state.
-  ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L);
-
-  VLOG_QUERY << "PlanFragment completed. instance_id=" << fragment_instance_id;
-
-#ifndef ADDRESS_SANITIZER
-  // tcmalloc and address sanitizer can not be used together
-  if (FLAGS_log_mem_usage_interval > 0) {
-    uint64_t num_complete = 
ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->value();
-    if (num_complete % FLAGS_log_mem_usage_interval == 0) {
-      char buf[2048];
-      // This outputs how much memory is currently being used by this impalad
-      MallocExtension::instance()->GetStats(buf, 2048);
-      LOG(INFO) << buf;
-    }
-  }
-#endif
-}
-
-shared_ptr<FragmentMgr::FragmentExecState> FragmentMgr::GetFragmentExecState(
-    const TUniqueId& fragment_instance_id) {
-  lock_guard<SpinLock> l(fragment_exec_state_map_lock_);
-  FragmentExecStateMap::const_iterator i =
-      fragment_exec_state_map_.find(fragment_instance_id);
-  if (i == fragment_exec_state_map_.end()) return 
shared_ptr<FragmentExecState>();
-
-  return i->second;
-}
-
-void FragmentMgr::CancelPlanFragment(TCancelPlanFragmentResult& return_val,
-    const TCancelPlanFragmentParams& params) {
-  VLOG_QUERY << "CancelPlanFragment(): instance_id=" << 
params.fragment_instance_id;
-  shared_ptr<FragmentExecState> exec_state =
-      GetFragmentExecState(params.fragment_instance_id);
-  if (exec_state.get() == NULL) {
-    Status status(ErrorMsg(TErrorCode::INTERNAL_ERROR,
-        Substitute("Unknown fragment id: $0",
-        lexical_cast<string>(params.fragment_instance_id))));
-    status.SetTStatus(&return_val);
-    return;
-  }
-  // we only initiate cancellation here, the map entry as well as the exec 
state
-  // are removed when fragment execution terminates (which is at present still
-  // running in exec_state->exec_thread_)
-  exec_state->Cancel().SetTStatus(&return_val);
-}
-
-void FragmentMgr::PublishFilter(TPublishFilterResult& return_val,
-    const TPublishFilterParams& params) {
-  VLOG_FILE << "PublishFilter(): dst_instance_id=" << params.dst_instance_id;
-  shared_ptr<FragmentExecState> fragment_exec_state =
-      GetFragmentExecState(params.dst_instance_id);
-  if (fragment_exec_state.get() == NULL) {
-    LOG(INFO) << "Unknown fragment (ID: " << params.dst_instance_id
-              << ") for filter (ID: " << params.filter_id << ")";
-    return;
-  }
-  fragment_exec_state->PublishFilter(params.filter_id, params.bloom_filter);
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/fragment-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-mgr.h b/be/src/service/fragment-mgr.h
deleted file mode 100644
index 1a180cf..0000000
--- a/be/src/service/fragment-mgr.h
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_SERVICE_FRAGMENT_MGR_H
-#define IMPALA_SERVICE_FRAGMENT_MGR_H
-
-#include <boost/thread/mutex.hpp>
-#include <boost/unordered_map.hpp>
-
-#include "gen-cpp/ImpalaInternalService.h"
-#include "common/status.h"
-#include "util/spinlock.h"
-
-namespace impala {
-
-/// Manages execution of individual plan fragment instances, which are 
typically run as a
-/// result of ExecPlanFragment() RPCs that arrive via the internal Impala 
interface.
-//
-/// A fragment is started in ExecPlanFragment(), which starts a thread which 
runs
-/// FragmentThread() asynchronously and returns. Fragments are Prepare()'d in 
that thread,
-/// and then Exec() is called. At any point a fragment may terminate either by
-/// cancellation via CancelPlanFragment(), or when FragmentThread() returns.
-//
-/// TODO: Remove Thrift args from methods where it would improve readability;
-/// ImpalaInternalService can take care of translation to / from Thrift, as it 
already
-/// does for ExecPlanFragment()'s return value.
-class FragmentMgr {
- public:
-  /// Registers a new FragmentExecState and launches the thread that calls 
Prepare() and
-  /// Exec() on it.
-  ///
-  /// Returns an error if there was some unrecoverable problem before the 
fragment was
-  /// registered (like low memory). Otherwise, returns OK, which guarantees 
that the
-  /// fragment is registered and must either run to completion or be cancelled.
-  ///
-  /// After this call returns, it is legal to call CancelPlanFragment() on this
-  /// fragment. If this call returns an error, CancelPlanFragment() will be a 
no-op
-  /// (because the fragment is unregistered).
-  Status ExecPlanFragment(const TExecPlanFragmentParams& params);
-
-  /// Cancels a plan fragment that is running asynchronously.
-  void CancelPlanFragment(TCancelPlanFragmentResult& return_val,
-      const TCancelPlanFragmentParams& params);
-
-  class FragmentExecState;
-
-  /// Returns a shared pointer to the FragmentExecState if one can be found 
for the
-  /// given id. If the id is not found, the shared pointer will contain NULL.
-  std::shared_ptr<FragmentExecState> GetFragmentExecState(
-      const TUniqueId& fragment_instance_id);
-
-  /// Publishes a global runtime filter to a local fragment.
-  void PublishFilter(TPublishFilterResult& return_val,
-      const TPublishFilterParams& params);
-
- private:
-  /// protects fragment_exec_state_map_
-  SpinLock fragment_exec_state_map_lock_;
-
-  /// Map from fragment instance id to exec state; FragmentExecState is owned 
by us and
-  /// referenced as a shared_ptr to allow asynchronous calls to 
CancelPlanFragment()
-  typedef boost::unordered_map<TUniqueId, std::shared_ptr<FragmentExecState>>
-  FragmentExecStateMap;
-  FragmentExecStateMap fragment_exec_state_map_;
-
-  /// Retrieves the 'exec_state' corresponding to fragment_instance_id. Calls
-  /// exec_state->Prepare() and then exec_state->Exec(). Finally unregisters 
the
-  /// exec_state from fragment_exec_state_map_. The exec_state must previously 
have been
-  /// registered in fragment_exec_state_map_. Runs in the fragment's execution 
thread.
-  void FragmentThread(TUniqueId fragment_instance_id);
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc 
b/be/src/service/impala-beeswax-server.cc
index e740be2..ad4963b 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -30,6 +30,8 @@
 #include "service/query-result-set.h"
 #include "util/impalad-metrics.h"
 #include "util/webserver.h"
+#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 
@@ -66,7 +68,7 @@ void ImpalaServer::query(QueryHandle& query_handle, const 
Query& query) {
   RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
       SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
 
-  exec_state->UpdateNonErrorQueryState(QueryState::RUNNING);
+  exec_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
   // start thread to wait for results to become available, which will allow
   // us to advance query state to FINISHED or EXCEPTION
   exec_state->WaitAsync();
@@ -107,7 +109,7 @@ void ImpalaServer::executeAndWait(QueryHandle& 
query_handle, const Query& query,
   RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
       SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
 
-  exec_state->UpdateNonErrorQueryState(QueryState::RUNNING);
+  exec_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
   // Once the query is running do a final check for session closure and add it 
to the
   // set of in-flight queries.
   Status status = SetQueryInflight(session, exec_state);
@@ -123,7 +125,7 @@ void ImpalaServer::executeAndWait(QueryHandle& 
query_handle, const Query& query,
     RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }
 
-  exec_state->UpdateNonErrorQueryState(QueryState::FINISHED);
+  exec_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
   TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
 
   // If the input log context id is an empty string, then create a new number 
and
@@ -146,7 +148,7 @@ void ImpalaServer::explain(QueryExplanation& 
query_explanation, const Query& que
       exec_env_->frontend()->GetExplainPlan(query_ctx, 
&query_explanation.textual),
       SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
   query_explanation.__isset.textual = true;
-  VLOG_QUERY << "explain():\nstmt=" << query_ctx.request.stmt
+  VLOG_QUERY << "explain():\nstmt=" << query_ctx.client_request.stmt
              << "\nplan: " << query_explanation.textual;
 }
 
@@ -233,7 +235,7 @@ void ImpalaServer::close(const QueryHandle& handle) {
   RAISE_IF_ERROR(UnregisterQuery(query_id, true), SQLSTATE_GENERAL_ERROR);
 }
 
-QueryState::type ImpalaServer::get_state(const QueryHandle& handle) {
+beeswax::QueryState::type ImpalaServer::get_state(const QueryHandle& handle) {
   ScopedSessionState session_handle(this);
   
RAISE_IF_ERROR(session_handle.WithSession(ThriftServer::GetThreadConnectionId()),
       SQLSTATE_GENERAL_ERROR);
@@ -250,7 +252,7 @@ QueryState::type ImpalaServer::get_state(const QueryHandle& 
handle) {
     RaiseBeeswaxException("Invalid query handle", SQLSTATE_GENERAL_ERROR);
   }
   // dummy to keep compiler happy
-  return QueryState::FINISHED;
+  return beeswax::QueryState::FINISHED;
 }
 
 void ImpalaServer::echo(string& echo_string, const string& input_string) {
@@ -393,7 +395,7 @@ void ImpalaServer::ResetTable(impala::TStatus& status, 
const TResetTableReq& req
 
 Status ImpalaServer::QueryToTQueryContext(const Query& query,
     TQueryCtx* query_ctx) {
-  query_ctx->request.stmt = query.query;
+  query_ctx->client_request.stmt = query.query;
   VLOG_QUERY << "query: " << ThriftDebugString(query);
   QueryOptionsMask set_query_options_mask;
   {
@@ -407,7 +409,7 @@ Status ImpalaServer::QueryToTQueryContext(const Query& 
query,
       // set yet, set it now.
       lock_guard<mutex> l(session->lock);
       if (session->connected_user.empty()) session->connected_user = 
query.hadoop_user;
-      query_ctx->request.query_options = session->default_query_options;
+      query_ctx->client_request.query_options = session->default_query_options;
       set_query_options_mask = session->set_query_options_mask;
     }
     session->ToThrift(session_id, &query_ctx->session);
@@ -416,7 +418,7 @@ Status ImpalaServer::QueryToTQueryContext(const Query& 
query,
   // Override default query options with Query.Configuration
   if (query.__isset.configuration) {
     for (const string& option: query.configuration) {
-      RETURN_IF_ERROR(ParseQueryOptions(option, 
&query_ctx->request.query_options,
+      RETURN_IF_ERROR(ParseQueryOptions(option, 
&query_ctx->client_request.query_options,
           &set_query_options_mask));
     }
   }
@@ -425,7 +427,7 @@ Status ImpalaServer::QueryToTQueryContext(const Query& 
query,
   // pool options.
   AddPoolQueryOptions(query_ctx, ~set_query_options_mask);
   VLOG_QUERY << "TClientRequest.queryOptions: "
-             << ThriftDebugString(query_ctx->request.query_options);
+             << ThriftDebugString(query_ctx->client_request.query_options);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc 
b/be/src/service/impala-hs2-server.cc
index ab4d1f3..5313b7d 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -36,6 +36,7 @@
 #include "exprs/expr.h"
 #include "rpc/thrift-util.h"
 #include "runtime/raw-value.h"
+#include "runtime/exec-env.h"
 #include "service/hs2-util.h"
 #include "service/query-exec-state.h"
 #include "service/query-options.h"
@@ -87,7 +88,15 @@ const string IMPALA_RESULT_CACHING_OPT = 
"impala.resultset.cache.size";
 
 // Helper function to translate between Beeswax and HiveServer2 type
 static TOperationState::type QueryStateToTOperationState(
-    const beeswax::QueryState::type& query_state);
+    const beeswax::QueryState::type& query_state) {
+  switch (query_state) {
+    case beeswax::QueryState::CREATED: return 
TOperationState::INITIALIZED_STATE;
+    case beeswax::QueryState::RUNNING: return TOperationState::RUNNING_STATE;
+    case beeswax::QueryState::FINISHED: return TOperationState::FINISHED_STATE;
+    case beeswax::QueryState::EXCEPTION: return TOperationState::ERROR_STATE;
+    default: return TOperationState::UKNOWN_STATE;
+  }
+}
 
 void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
     TMetadataOpRequest* request, TOperationHandle* handle, thrift::TStatus* 
status) {
@@ -131,7 +140,7 @@ void ImpalaServer::ExecuteMetadataOp(const 
THandleIdentifier& session_handle,
       _TMetadataOpcode_VALUES_TO_NAMES.find(request->opcode);
   const string& query_text = query_text_it == 
_TMetadataOpcode_VALUES_TO_NAMES.end() ?
       "N/A" : query_text_it->second;
-  query_ctx.request.stmt = query_text;
+  query_ctx.client_request.stmt = query_text;
   exec_state.reset(new QueryExecState(query_ctx, exec_env_,
       exec_env_->frontend(), this, session));
   Status register_status = RegisterQuery(session, exec_state);
@@ -151,7 +160,7 @@ void ImpalaServer::ExecuteMetadataOp(const 
THandleIdentifier& session_handle,
     return;
   }
 
-  exec_state->UpdateNonErrorQueryState(QueryState::FINISHED);
+  exec_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
 
   Status inflight_status = SetQueryInflight(session, exec_state);
   if (!inflight_status.ok()) {
@@ -215,7 +224,7 @@ Status ImpalaServer::FetchInternal(const TUniqueId& 
query_id, int32_t fetch_size
 
 Status ImpalaServer::TExecuteStatementReqToTQueryContext(
     const TExecuteStatementReq execute_request, TQueryCtx* query_ctx) {
-  query_ctx->request.stmt = execute_request.statement;
+  query_ctx->client_request.stmt = execute_request.statement;
   VLOG_QUERY << "TExecuteStatementReq: " << ThriftDebugString(execute_request);
   QueryOptionsMask set_query_options_mask;
   {
@@ -228,7 +237,7 @@ Status ImpalaServer::TExecuteStatementReqToTQueryContext(
     RETURN_IF_ERROR(GetSessionState(session_id, &session_state));
     session_state->ToThrift(session_id, &query_ctx->session);
     lock_guard<mutex> l(session_state->lock);
-    query_ctx->request.query_options = session_state->default_query_options;
+    query_ctx->client_request.query_options = 
session_state->default_query_options;
     set_query_options_mask = session_state->set_query_options_mask;
   }
 
@@ -243,14 +252,14 @@ Status ImpalaServer::TExecuteStatementReqToTQueryContext(
         continue;
       }
       RETURN_IF_ERROR(SetQueryOption(conf_itr->first, conf_itr->second,
-          &query_ctx->request.query_options, &set_query_options_mask));
+          &query_ctx->client_request.query_options, &set_query_options_mask));
     }
   }
   // Only query options not set in the session or confOverlay can be 
overridden by the
   // pool options.
   AddPoolQueryOptions(query_ctx, ~set_query_options_mask);
   VLOG_QUERY << "TClientRequest.queryOptions: "
-             << ThriftDebugString(query_ctx->request.query_options);
+             << ThriftDebugString(query_ctx->client_request.query_options);
   return Status::OK();
 }
 
@@ -455,7 +464,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& 
return_val,
       HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
     }
   }
-  exec_state->UpdateNonErrorQueryState(QueryState::RUNNING);
+  exec_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
   // Start thread to wait for results to become available.
   exec_state->WaitAsync();
   // Once the query is running do a final check for session closure and add it 
to the
@@ -878,14 +887,5 @@ void 
ImpalaServer::RenewDelegationToken(TRenewDelegationTokenResp& return_val,
   return_val.status.__set_errorMessage("Not implemented");
 }
 
-TOperationState::type QueryStateToTOperationState(const QueryState::type& 
query_state) {
-  switch (query_state) {
-    case QueryState::CREATED: return TOperationState::INITIALIZED_STATE;
-    case QueryState::RUNNING: return TOperationState::RUNNING_STATE;
-    case QueryState::FINISHED: return TOperationState::FINISHED_STATE;
-    case QueryState::EXCEPTION: return TOperationState::ERROR_STATE;
-    default: return TOperationState::UKNOWN_STATE;
-  }
-}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc 
b/be/src/service/impala-http-handler.cc
index b675f3a..a324b1f 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -25,6 +25,7 @@
 #include "catalog/catalog-util.h"
 #include "gen-cpp/beeswax_types.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/exec-env.h"
 #include "service/impala-server.h"
 #include "service/query-exec-state.h"
 #include "thrift/protocol/TDebugProtocol.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/impala-internal-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.cc 
b/be/src/service/impala-internal-service.cc
new file mode 100644
index 0000000..fff7f9e
--- /dev/null
+++ b/be/src/service/impala-internal-service.cc
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/impala-internal-service.h"
+
+#include <boost/lexical_cast.hpp>
+
+#include "common/status.h"
+#include "service/impala-server.h"
+#include "runtime/query-exec-mgr.h"
+#include "runtime/query-state.h"
+#include "runtime/fragment-instance-state.h"
+#include "runtime/exec-env.h"
+#include "testutil/fault-injection-util.h"
+
+using namespace impala;
+
+ImpalaInternalService::ImpalaInternalService() {
+  impala_server_ = ExecEnv::GetInstance()->impala_server();
+  DCHECK(impala_server_ != nullptr);
+  query_exec_mgr_ = ExecEnv::GetInstance()->query_exec_mgr();
+  DCHECK(query_exec_mgr_ != nullptr);
+}
+
+void ImpalaInternalService::ExecPlanFragment(TExecPlanFragmentResult& 
return_val,
+    const TExecPlanFragmentParams& params) {
+  VLOG_QUERY << "ExecPlanFragment():"
+            << " instance_id=" << 
params.fragment_instance_ctx.fragment_instance_id;
+  FAULT_INJECTION_RPC_DELAY(RPC_EXECPLANFRAGMENT);
+  query_exec_mgr_->StartFInstance(params).SetTStatus(&return_val);
+}
+
+template <typename T> void SetUnknownIdError(
+    const string& id_type, const TUniqueId& id, T* status_container) {
+  Status status(ErrorMsg(TErrorCode::INTERNAL_ERROR,
+      Substitute("Unknown $0 id: $1", id_type, lexical_cast<string>(id))));
+  status.SetTStatus(status_container);
+}
+
+void ImpalaInternalService::CancelPlanFragment(TCancelPlanFragmentResult& 
return_val,
+    const TCancelPlanFragmentParams& params) {
+  VLOG_QUERY << "CancelPlanFragment(): instance_id=" << 
params.fragment_instance_id;
+  FAULT_INJECTION_RPC_DELAY(RPC_CANCELPLANFRAGMENT);
+  QueryState::ScopedRef qs(GetQueryId(params.fragment_instance_id));
+  if (qs.get() == nullptr) {
+    SetUnknownIdError("query", GetQueryId(params.fragment_instance_id), 
&return_val);
+    return;
+  }
+  FragmentInstanceState* fis = 
qs->GetFInstanceState(params.fragment_instance_id);
+  if (fis == nullptr) {
+    SetUnknownIdError("instance", params.fragment_instance_id, &return_val);
+    return;
+  }
+  Status status = fis->Cancel();
+  status.SetTStatus(&return_val);
+}
+
+void ImpalaInternalService::ReportExecStatus(TReportExecStatusResult& 
return_val,
+    const TReportExecStatusParams& params) {
+  VLOG_QUERY << "ReportExecStatus(): instance_id=" << 
params.fragment_instance_id;
+  FAULT_INJECTION_RPC_DELAY(RPC_REPORTEXECSTATUS);
+  impala_server_->ReportExecStatus(return_val, params);
+}
+
+void ImpalaInternalService::TransmitData(TTransmitDataResult& return_val,
+    const TTransmitDataParams& params) {
+  FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA);
+  impala_server_->TransmitData(return_val, params);
+}
+
+void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
+    const TUpdateFilterParams& params) {
+  VLOG_QUERY << "UpdateFilter(): filter=" << params.filter_id
+            << " query_id=" << PrintId(params.query_id);
+  FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER);
+  impala_server_->UpdateFilter(return_val, params);
+}
+
+void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val,
+    const TPublishFilterParams& params) {
+  VLOG_QUERY << "PublishFilter(): filter=" << params.filter_id
+            << " instance_id=" << PrintId(params.dst_instance_id);
+  FAULT_INJECTION_RPC_DELAY(RPC_PUBLISHFILTER);
+  QueryState::ScopedRef qs(GetQueryId(params.dst_instance_id));
+  if (qs.get() == nullptr) return;
+  FragmentInstanceState* fis = qs->GetFInstanceState(params.dst_instance_id);
+  if (fis == nullptr) return;
+  fis->PublishFilter(params.filter_id, params.bloom_filter);
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/impala-internal-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.h 
b/be/src/service/impala-internal-service.h
index af54c35..372d617 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -20,65 +20,33 @@
 
 #include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
-#include "service/impala-server.h"
-#include "service/fragment-mgr.h"
-#include "testutil/fault-injection-util.h"
 
 namespace impala {
 
+class ImpalaServer;
+class QueryExecMgr;
+
 /// Proxies Thrift RPC requests onto their implementing objects for the
 /// ImpalaInternalService service.
 class ImpalaInternalService : public ImpalaInternalServiceIf {
  public:
-  ImpalaInternalService() {
-    impala_server_ = ExecEnv::GetInstance()->impala_server();
-    DCHECK(impala_server_ != nullptr);
-    fragment_mgr_ = ExecEnv::GetInstance()->fragment_mgr();
-    DCHECK(fragment_mgr_ != nullptr);
-  }
-
+  ImpalaInternalService();
   virtual void ExecPlanFragment(TExecPlanFragmentResult& return_val,
-      const TExecPlanFragmentParams& params) {
-    FAULT_INJECTION_RPC_DELAY(RPC_EXECPLANFRAGMENT);
-    fragment_mgr_->ExecPlanFragment(params).SetTStatus(&return_val);
-  }
-
+      const TExecPlanFragmentParams& params);
   virtual void CancelPlanFragment(TCancelPlanFragmentResult& return_val,
-      const TCancelPlanFragmentParams& params) {
-    FAULT_INJECTION_RPC_DELAY(RPC_CANCELPLANFRAGMENT);
-    fragment_mgr_->CancelPlanFragment(return_val, params);
-  }
-
+      const TCancelPlanFragmentParams& params);
   virtual void ReportExecStatus(TReportExecStatusResult& return_val,
-      const TReportExecStatusParams& params) {
-    FAULT_INJECTION_RPC_DELAY(RPC_REPORTEXECSTATUS);
-    impala_server_->ReportExecStatus(return_val, params);
-  }
-
+      const TReportExecStatusParams& params);
   virtual void TransmitData(TTransmitDataResult& return_val,
-      const TTransmitDataParams& params) {
-    FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA);
-    impala_server_->TransmitData(return_val, params);
-  }
-
+      const TTransmitDataParams& params);
   virtual void UpdateFilter(TUpdateFilterResult& return_val,
-      const TUpdateFilterParams& params) {
-    FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER);
-    impala_server_->UpdateFilter(return_val, params);
-  }
-
+      const TUpdateFilterParams& params);
   virtual void PublishFilter(TPublishFilterResult& return_val,
-      const TPublishFilterParams& params) {
-    FAULT_INJECTION_RPC_DELAY(RPC_PUBLISHFILTER);
-    fragment_mgr_->PublishFilter(return_val, params);
-  }
+      const TPublishFilterParams& params);
 
  private:
-  /// Manages fragment reporting and data transmission
   ImpalaServer* impala_server_;
-
-  /// Manages fragment execution
-  FragmentMgr* fragment_mgr_;
+  QueryExecMgr* query_exec_mgr_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 5e08224..a5f176b 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -54,7 +54,6 @@
 #include "runtime/lib-cache.h"
 #include "runtime/timestamp-value.h"
 #include "runtime/tmp-file-mgr.h"
-#include "service/fragment-exec-state.h"
 #include "service/impala-internal-service.h"
 #include "service/impala-http-handler.h"
 #include "service/query-exec-state.h"
@@ -72,6 +71,8 @@
 #include "util/string-parser.h"
 #include "util/summary-util.h"
 #include "util/uid-util.h"
+#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/Types_types.h"
 #include "gen-cpp/ImpalaService.h"
@@ -742,7 +743,7 @@ void ImpalaServer::AddPoolQueryOptions(TQueryCtx* ctx,
            << " override_options_mask=" << override_options_mask.to_string()
            << " set_pool_mask=" << set_pool_options_mask.to_string()
            << " overlay_mask=" << overlay_mask.to_string();
-  OverlayQueryOptions(pool_options, overlay_mask, &ctx->request.query_options);
+  OverlayQueryOptions(pool_options, overlay_mask, 
&ctx->client_request.query_options);
 }
 
 Status ImpalaServer::Execute(TQueryCtx* query_ctx,
@@ -752,9 +753,9 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx,
   ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);
 
   // Redact the SQL stmt and update the query context
-  string stmt = replace_all_copy(query_ctx->request.stmt, "\n", " ");
+  string stmt = replace_all_copy(query_ctx->client_request.stmt, "\n", " ");
   Redact(&stmt);
-  query_ctx->request.__set_redacted_stmt((const string) stmt);
+  query_ctx->client_request.__set_redacted_stmt((const string) stmt);
 
   bool registered_exec_state;
   Status status = ExecuteInternal(*query_ctx, session_state, 
&registered_exec_state,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index d87af39..e2f9c67 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -45,6 +45,7 @@
 #include "runtime/runtime-state.h"
 #include "runtime/timestamp-value.h"
 #include "runtime/types.h"
+#include "statestore/statestore-subscriber.h"
 
 namespace impala {
 
@@ -259,8 +260,9 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   ///  - incoming_topic_deltas: all changes to registered statestore topics
   ///  - subscriber_topic_updates: output parameter to publish any topic 
updates to.
   ///                              Currently unused.
-  void MembershipCallback(const StatestoreSubscriber::TopicDeltaMap&
-      incoming_topic_deltas, std::vector<TTopicDelta>* 
subscriber_topic_updates);
+  void MembershipCallback(
+      const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
+      std::vector<TTopicDelta>* subscriber_topic_updates);
 
   void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& 
topic_deltas,
       std::vector<TTopicDelta>* topic_updates);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc 
b/be/src/service/query-exec-state.cc
index 6075914..6fa6d4a 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -24,6 +24,8 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "runtime/exec-env.h"
+#include "scheduling/scheduler.h"
 #include "service/frontend.h"
 #include "service/impala-server.h"
 #include "service/query-options.h"
@@ -114,7 +116,7 @@ ImpalaServer::QueryExecState::QueryExecState(
   summary_profile_.AddInfoString("Network Address",
       lexical_cast<string>(session_->network_address));
   summary_profile_.AddInfoString("Default Db", default_db());
-  summary_profile_.AddInfoString("Sql Statement", query_ctx_.request.stmt);
+  summary_profile_.AddInfoString("Sql Statement", 
query_ctx_.client_request.stmt);
   summary_profile_.AddInfoString("Coordinator",
       TNetworkAddressToString(exec_env->backend_address()));
 }
@@ -145,7 +147,7 @@ Status ImpalaServer::QueryExecState::Exec(TExecRequest* 
exec_request) {
   summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type()));
   summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
   summary_profile_.AddInfoString("Query Options (non default)",
-      DebugQueryOptions(query_ctx_.request.query_options));
+      DebugQueryOptions(query_ctx_.client_request.query_options));
 
   switch (exec_request->stmt_type) {
     case TStmtType::QUERY:
@@ -615,7 +617,7 @@ void ImpalaServer::QueryExecState::Wait() {
     UpdateQueryStatus(status);
   }
   if (status.ok()) {
-    UpdateNonErrorQueryState(QueryState::FINISHED);
+    UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
   }
 }
 
@@ -691,16 +693,17 @@ Status ImpalaServer::QueryExecState::RestartFetch() {
   return Status::OK();
 }
 
-void ImpalaServer::QueryExecState::UpdateNonErrorQueryState(QueryState::type 
query_state) {
+void ImpalaServer::QueryExecState::UpdateNonErrorQueryState(
+    beeswax::QueryState::type query_state) {
   lock_guard<mutex> l(lock_);
-  DCHECK(query_state != QueryState::EXCEPTION);
+  DCHECK(query_state != beeswax::QueryState::EXCEPTION);
   if (query_state_ < query_state) query_state_ = query_state;
 }
 
 Status ImpalaServer::QueryExecState::UpdateQueryStatus(const Status& status) {
   // Preserve the first non-ok status
   if (!status.ok() && query_status_.ok()) {
-    query_state_ = QueryState::EXCEPTION;
+    query_state_ = beeswax::QueryState::EXCEPTION;
     query_status_ = status;
     summary_profile_.AddInfoString("Query Status", query_status_.GetDetail());
   }
@@ -710,12 +713,12 @@ Status 
ImpalaServer::QueryExecState::UpdateQueryStatus(const Status& status) {
 
 Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows,
     QueryResultSet* fetched_rows) {
-  DCHECK(query_state_ != QueryState::EXCEPTION);
+  DCHECK(query_state_ != beeswax::QueryState::EXCEPTION);
 
   if (eos_) return Status::OK();
 
   if (request_result_set_ != NULL) {
-    query_state_ = QueryState::FINISHED;
+    query_state_ = beeswax::QueryState::FINISHED;
     int num_rows = 0;
     const vector<TResultRow>& all_rows = (*(request_result_set_.get()));
     // max_rows <= 0 means no limit
@@ -743,7 +746,7 @@ Status 
ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows,
     if (num_rows_fetched_from_cache >= max_rows) return Status::OK();
   }
 
-  query_state_ = QueryState::FINISHED;  // results will be ready after this 
call
+  query_state_ = beeswax::QueryState::FINISHED;  // results will be ready 
after this call
 
   // Maximum number of rows to be fetched from the coord.
   int32_t max_coord_rows = max_rows;
@@ -847,12 +850,12 @@ Status ImpalaServer::QueryExecState::Cancel(bool 
check_inflight, const Status* c
   {
     lock_guard<mutex> lock(lock_);
     // If the query is completed or cancelled, no need to update state.
-    bool already_done = eos_ || query_state_ == QueryState::EXCEPTION;
+    bool already_done = eos_ || query_state_ == beeswax::QueryState::EXCEPTION;
     if (!already_done && cause != NULL) {
       DCHECK(!cause->ok());
       UpdateQueryStatus(*cause);
       query_events_->MarkEvent("Cancelled");
-      DCHECK_EQ(query_state_, QueryState::EXCEPTION);
+      DCHECK_EQ(query_state_, beeswax::QueryState::EXCEPTION);
     }
     // Get a copy of the coordinator pointer while holding 'lock_'.
     coord = coord_.get();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/service/query-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.h 
b/be/src/service/query-exec-state.h
index 28c8cba..20cd7bf 100644
--- a/be/src/service/query-exec-state.h
+++ b/be/src/service/query-exec-state.h
@@ -184,8 +184,10 @@ class ImpalaServer::QueryExecState {
   const RuntimeProfile& summary_profile() const { return summary_profile_; }
   const TimestampValue& start_time() const { return start_time_; }
   const TimestampValue& end_time() const { return end_time_; }
-  const std::string& sql_stmt() const { return query_ctx_.request.stmt; }
-  const TQueryOptions& query_options() const { return 
query_ctx_.request.query_options; }
+  const std::string& sql_stmt() const { return query_ctx_.client_request.stmt; 
}
+  const TQueryOptions& query_options() const {
+    return query_ctx_.client_request.query_options;
+  }
   /// Returns 0:0 if this is a root query
   TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/testutil/desc-tbl-builder.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/desc-tbl-builder.h 
b/be/src/testutil/desc-tbl-builder.h
index 9ad92b1..9d27092 100644
--- a/be/src/testutil/desc-tbl-builder.h
+++ b/be/src/testutil/desc-tbl-builder.h
@@ -18,13 +18,18 @@
 #ifndef IMPALA_TESTUTIL_ROW_DESC_BUILDER_H_
 #define IMPALA_TESTUTIL_ROW_DESC_BUILDER_H_
 
+#include <vector>
+
 #include "runtime/runtime-state.h"
 #include "runtime/types.h"
+#include "gen-cpp/Descriptors_types.h"
 
 namespace impala {
 
+class Frontend;
 class ObjectPool;
 class TupleDescBuilder;
+class DescriptorTbl;
 
 /// Aids in the construction of a DescriptorTbl by declaring tuples and slots
 /// associated with those tuples.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/udf/udf.cc
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index 9c3b131..cf1fa0f 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -93,7 +93,7 @@ class RuntimeState {
   }
 
   const std::string connected_user() const { return ""; }
-  const std::string effective_user() const { return ""; }
+  const std::string GetEffectiveUser() const { return ""; }
 };
 
 }
@@ -249,7 +249,7 @@ const char* FunctionContext::user() const {
 
 const char* FunctionContext::effective_user() const {
   if (impl_->state_ == NULL) return NULL;
-  return impl_->state_->effective_user().c_str();
+  return impl_->state_->GetEffectiveUser().c_str();
 }
 
 FunctionContext::UniqueId FunctionContext::query_id() const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/util/container-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/container-util.h b/be/src/util/container-util.h
index efb4f52..80ec984 100644
--- a/be/src/util/container-util.h
+++ b/be/src/util/container-util.h
@@ -22,6 +22,7 @@
 #include <map>
 #include <unordered_map>
 #include <boost/unordered_map.hpp>
+#include <boost/functional/hash.hpp>
 
 #include "util/hash-util.h"
 #include "gen-cpp/Types_types.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/util/thread.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index 4e2b65d..a142a94 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -103,6 +103,11 @@ class Thread {
   /// will be unregistered with the ThreadMgr and will not appear in the debug 
UI.
   void Join() const { thread_->join(); }
 
+  /// Detaches the underlying thread from this Thread object. It's illegal to 
call
+  /// Join() after calling Detach(). When the underlying thread finishes 
execution,
+  /// it unregisters itself from the ThreadMgr.
+  void Detach() const { thread_->detach(); }
+
   /// The thread ID assigned to this thread by the operating system. If the OS 
does not
   /// support retrieving the tid, returns Thread::INVALID_THREAD_ID.
   int64_t tid() const { return tid_; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/util/uid-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h
index de18464..b532278 100644
--- a/be/src/util/uid-util.h
+++ b/be/src/util/uid-util.h
@@ -29,14 +29,28 @@
 
 namespace impala {
 
-/// This function must be called 'hash_value' to be picked up by boost.
-inline std::size_t hash_value(const impala::TUniqueId& id) {
+inline std::size_t hash_value(const TUniqueId& id) {
   std::size_t seed = 0;
   boost::hash_combine(seed, id.lo);
   boost::hash_combine(seed, id.hi);
   return seed;
 }
 
+}
+
+/// Hash function for std:: containers
+namespace std {
+
+template<> struct hash<impala::TUniqueId> {
+  std::size_t operator()(const impala::TUniqueId& id) const {
+    return impala::hash_value(id);
+  }
+};
+
+}
+
+namespace impala {
+
 inline void UUIDToTUniqueId(const boost::uuids::uuid& uuid, TUniqueId* 
unique_id) {
   memcpy(&(unique_id->hi), &uuid.data[0], 8);
   memcpy(&(unique_id->lo), &uuid.data[8], 8);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index 5e79c07..bb4251b 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -267,8 +267,7 @@ struct TClientRequest {
 // TODO: Separate into FE/BE initialized vars.
 struct TQueryCtx {
   // Client request containing stmt to execute and query options.
-  // TODO: rename to client_request, we have too many requests
-  1: required TClientRequest request
+  1: required TClientRequest client_request
 
   // A globally unique id assigned to the entire query in the BE.
   // The bottom 4 bytes are 0 (for details see be/src/util/uid-util.h).

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java 
b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index e720867..d57c25f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -72,7 +72,7 @@ public class AnalysisContext {
     // expr rewrites can be disabled via a query option. When rewrites are 
enabled
     // BetweenPredicates should be rewritten first to help trigger other rules.
     List<ExprRewriteRule> rules = 
Lists.newArrayList(BetweenToCompoundRule.INSTANCE);
-    if (queryCtx.getRequest().getQuery_options().enable_expr_rewrites) {
+    if (queryCtx.getClient_request().getQuery_options().enable_expr_rewrites) {
       rules.add(FoldConstantsRule.INSTANCE);
       rules.add(ExtractCommonConjunctRule.INSTANCE);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java 
b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 61d1c20..8bea3aa 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -2285,7 +2285,7 @@ public class Analyzer {
   public User getUser() { return user_; }
   public TQueryCtx getQueryCtx() { return globalState_.queryCtx; }
   public TQueryOptions getQueryOptions() {
-    return globalState_.queryCtx.getRequest().getQuery_options();
+    return globalState_.queryCtx.client_request.getQuery_options();
   }
   public AuthorizationConfig getAuthzConfig() { return 
globalState_.authzConfig; }
   public ListMap<TNetworkAddress> getHostIndex() { return 
globalState_.hostIndex; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java 
b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
index 92b7106..dac4e8f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
@@ -367,10 +367,10 @@ public class ColumnLineageGraph {
     Preconditions.checkNotNull(analyzer);
     Preconditions.checkState(analyzer.isRootAnalyzer());
     TQueryCtx queryCtx = analyzer.getQueryCtx();
-    if (queryCtx.request.isSetRedacted_stmt()) {
-      queryStr_ = queryCtx.request.redacted_stmt;
+    if (queryCtx.client_request.isSetRedacted_stmt()) {
+      queryStr_ = queryCtx.client_request.redacted_stmt;
     } else {
-      queryStr_ = queryCtx.request.stmt;
+      queryStr_ = queryCtx.client_request.stmt;
     }
     Preconditions.checkNotNull(queryStr_);
     timestamp_ = queryCtx.start_unix_millis / 1000;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index 74399d0..e3f4e6a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -641,7 +641,7 @@ public class SelectStmt extends QueryStmt {
 
     // Optionally rewrite all count(distinct <expr>) into equivalent NDV() 
calls.
     ExprSubstitutionMap ndvSmap = null;
-    if (analyzer.getQueryCtx().getRequest().query_options.appx_count_distinct) 
{
+    if 
(analyzer.getQueryCtx().client_request.query_options.appx_count_distinct) {
       ndvSmap = new ExprSubstitutionMap();
       for (FunctionCallExpr aggExpr: aggExprs) {
         if (!aggExpr.isDistinct()


Reply via email to