IMPALA-5147: Add the ability to exclude hosts from query execution

This commit introduces a new startup option, termed 'is_executor',
that determines whether an impalad process can execute query fragments.
The 'is_executor' option determines if a specific host will be included
in the scheduler's backend configuration and hence included in
scheduling decisions.

Testing:
- Added a customer cluster test.
- Added a new scheduler test.

Change-Id: I5d2ff7f341c9d2b0649e4d14561077e166ad7c4d
Reviewed-on: http://gerrit.cloudera.org:8080/6628
Reviewed-by: Dimitris Tsirogiannis <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e2c53a8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e2c53a8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e2c53a8b

Branch: refs/heads/master
Commit: e2c53a8bdf646331b29c3de921b681d0d885c82e
Parents: 5809317
Author: Dimitris Tsirogiannis <[email protected]>
Authored: Thu Apr 13 11:18:47 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Wed Apr 26 01:45:40 2017 +0000

----------------------------------------------------------------------
 be/src/scheduling/scheduler-test-util.cc  |  13 +-
 be/src/scheduling/scheduler-test-util.h   |  15 +-
 be/src/scheduling/scheduler-test.cc       |  21 ++
 be/src/scheduling/scheduler.cc            | 285 ++++++++++++-------------
 be/src/scheduling/scheduler.h             | 220 ++++++++++---------
 be/src/service/impala-http-handler.cc     |  19 ++
 be/src/service/impala-http-handler.h      |  10 +
 be/src/service/impala-server.cc           |  24 ++-
 be/src/service/impala-server.h            |  28 ++-
 be/src/util/network-util.cc               |   2 +
 be/src/util/webserver.cc                  |  10 +-
 bin/start-impala-cluster.py               |  27 ++-
 common/thrift/StatestoreService.thrift    |  13 +-
 tests/common/custom_cluster_test_suite.py |   7 +-
 tests/custom_cluster/test_coordinators.py |  34 +++
 www/backends.tmpl                         |   8 +-
 www/root.tmpl                             |   3 +-
 17 files changed, 432 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc 
b/be/src/scheduling/scheduler-test-util.cc
index 782379c..7547782 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -58,14 +58,14 @@ const string Cluster::IP_PREFIX = "10";
 /// Default size for new blocks is 1MB.
 const int64_t Block::DEFAULT_BLOCK_SIZE = 1 << 20;
 
-int Cluster::AddHost(bool has_backend, bool has_datanode) {
+int Cluster::AddHost(bool has_backend, bool has_datanode, bool is_executor) {
   int host_idx = hosts_.size();
   int be_port = has_backend ? BACKEND_PORT : -1;
   int dn_port = has_datanode ? DATANODE_PORT : -1;
   IpAddr ip = HostIdxToIpAddr(host_idx);
   DCHECK(ip_to_idx_.find(ip) == ip_to_idx_.end());
   ip_to_idx_[ip] = host_idx;
-  hosts_.push_back(Host(HostIdxToHostname(host_idx), ip, be_port, dn_port));
+  hosts_.push_back(Host(HostIdxToHostname(host_idx), ip, be_port, dn_port, 
is_executor));
   // Add host to lists of backend indexes per type.
   if (has_backend) backend_host_idxs_.push_back(host_idx);
   if (has_datanode) {
@@ -79,8 +79,9 @@ int Cluster::AddHost(bool has_backend, bool has_datanode) {
   return host_idx;
 }
 
-void Cluster::AddHosts(int num_hosts, bool has_backend, bool has_datanode) {
-  for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode);
+void Cluster::AddHosts(int num_hosts, bool has_backend, bool has_datanode,
+    bool is_executor) {
+  for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode, 
is_executor);
 }
 
 Hostname Cluster::HostIdxToHostname(int host_idx) {
@@ -454,7 +455,7 @@ Status SchedulerWrapper::Compute(bool exec_at_coord, 
Result* result) {
 
   // Compute Assignment.
   FragmentScanRangeAssignment* assignment = result->AddAssignment();
-  return 
scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0,
+  return 
scheduler_->ComputeScanRangeAssignment(*scheduler_->GetExecutorsConfig(), 0,
       nullptr, false, plan_.scan_range_locations(), 
plan_.referenced_datanodes(),
       exec_at_coord, plan_.query_options(), nullptr, assignment);
 }
@@ -519,6 +520,8 @@ void SchedulerWrapper::AddHostToTopicDelta(const Host& 
host, TTopicDelta* delta)
   be_desc.address.hostname = host.ip;
   be_desc.address.port = host.be_port;
   be_desc.ip_address = host.ip;
+  be_desc.__set_is_coordinator(host.is_coordinator);
+  be_desc.__set_is_executor(host.is_executor);
 
   // Build topic item.
   TTopicItem item;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.h 
b/be/src/scheduling/scheduler-test-util.h
index 6cce021..c3d6b1c 100644
--- a/be/src/scheduling/scheduler-test-util.h
+++ b/be/src/scheduling/scheduler-test-util.h
@@ -86,13 +86,17 @@ enum class ReplicaPlacement {
 
 /// Host model. Each host can have either a backend, a datanode, or both. To 
specify that
 /// a host should not act as a backend or datanode specify '-1' as the 
respective port.
+/// A host with a backend is always a coordinator but it may not be an 
executor.
 struct Host {
-  Host(const Hostname& name, const IpAddr& ip, int be_port, int dn_port)
-    : name(name), ip(ip), be_port(be_port), dn_port(dn_port) {}
+  Host(const Hostname& name, const IpAddr& ip, int be_port, int dn_port, bool 
is_executor)
+    : name(name), ip(ip), be_port(be_port), dn_port(dn_port), 
is_coordinator(true),
+      is_executor(is_executor) {}
   Hostname name;
   IpAddr ip;
   int be_port; // Backend port
   int dn_port; // Datanode port
+  bool is_coordinator; // True if this is a coordinator host
+  bool is_executor; // True if this is an executor host
 };
 
 /// A cluster stores a list of hosts and provides various methods to add hosts 
to the
@@ -101,10 +105,13 @@ class Cluster {
  public:
   /// Add a host and return the host's index. 'hostname' and 'ip' of the new 
host will be
   /// generated and are guaranteed to be unique.
-  int AddHost(bool has_backend, bool has_datanode);
+  /// TODO: Refactor the construction of a host and its addition to a cluster 
to
+  /// avoid the boolean input parameters.
+  int AddHost(bool has_backend, bool has_datanode, bool is_executor = true);
 
   /// Add a number of hosts with the same properties by repeatedly calling 
AddHost(..).
-  void AddHosts(int num_hosts, bool has_backend, bool has_datanode);
+  void AddHosts(int num_hosts, bool has_backend, bool has_datanode,
+      bool is_executor = true);
 
   /// Convert a host index to a hostname.
   static Hostname HostIdxToHostname(int host_idx);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test.cc 
b/be/src/scheduling/scheduler-test.cc
index 3e05c5b..c7b284b 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -52,6 +52,27 @@ TEST_F(SchedulerTest, SingleHostSingleFile) {
   EXPECT_EQ(0, result.NumCachedAssignments());
 }
 
+/// Test cluster configuration with one coordinator that can't process scan 
ranges.
+TEST_F(SchedulerTest, SingleCoordinatorNoExecutor) {
+  Cluster cluster;
+  cluster.AddHost(true, true, false);
+  cluster.AddHost(true, true, true);
+  cluster.AddHost(true, true, true);
+
+  Schema schema(cluster);
+  schema.AddMultiBlockTable("T1", 10, ReplicaPlacement::LOCAL_ONLY, 3);
+
+  Plan plan(schema);
+  plan.AddTableScan("T1");
+
+  Result result(plan);
+  SchedulerWrapper scheduler(plan);
+  scheduler.Compute(&result);
+
+  EXPECT_EQ(2, result.NumDistinctBackends());
+  EXPECT_EQ(0, result.NumDiskAssignments(0));
+}
+
 /// Test assigning all scan ranges to the coordinator.
 TEST_F(SchedulerTest, ExecAtCoord) {
   Cluster cluster;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index a73e13a..bca5965 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -29,7 +29,6 @@
 #include "common/logging.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/Types_types.h"
-#include "rapidjson/rapidjson.h"
 #include "runtime/exec-env.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/container-util.h"
@@ -41,7 +40,6 @@
 
 using boost::algorithm::join;
 using namespace apache::thrift;
-using namespace rapidjson;
 using namespace strings;
 
 DECLARE_int32(be_port);
@@ -54,15 +52,12 @@ static const string 
ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
 static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
 static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends");
 
-static const string BACKENDS_WEB_PAGE = "/backends";
-static const string BACKENDS_TEMPLATE = "backends.tmpl";
-
 const string Scheduler::IMPALA_MEMBERSHIP_TOPIC("impala-membership");
 
 Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& 
backend_id,
     const TNetworkAddress& backend_address, MetricGroup* metrics, Webserver* 
webserver,
     RequestPoolService* request_pool_service)
-  : backend_config_(std::make_shared<const BackendConfig>()),
+  : executors_config_(std::make_shared<const BackendConfig>()),
     metrics_(metrics->GetOrCreateChildGroup("scheduler")),
     webserver_(webserver),
     statestore_subscriber_(subscriber),
@@ -77,7 +72,7 @@ Scheduler::Scheduler(StatestoreSubscriber* subscriber, const 
string& backend_id,
 
 Scheduler::Scheduler(const vector<TNetworkAddress>& backends, MetricGroup* 
metrics,
     Webserver* webserver, RequestPoolService* request_pool_service)
-  : backend_config_(std::make_shared<const BackendConfig>(backends)),
+  : executors_config_(std::make_shared<const BackendConfig>(backends)),
     metrics_(metrics),
     webserver_(webserver),
     statestore_subscriber_(nullptr),
@@ -109,13 +104,6 @@ Status Scheduler::Init() {
 
   coord_only_backend_config_.AddBackend(local_backend_descriptor_);
 
-  if (webserver_ != nullptr) {
-    Webserver::UrlCallback backends_callback =
-        bind<void>(mem_fn(&Scheduler::BackendsUrlCallback), this, _1, _2);
-    webserver_->RegisterUrlCallback(
-        BACKENDS_WEB_PAGE, BACKENDS_TEMPLATE, backends_callback);
-  }
-
   if (statestore_subscriber_ != nullptr) {
     StatestoreSubscriber::UpdateCallback cb =
         bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2);
@@ -128,8 +116,8 @@ Status Scheduler::Init() {
 
   if (metrics_ != nullptr) {
     // This is after registering with the statestored, so we already have to 
synchronize
-    // access to the backend_config_ shared_ptr.
-    int num_backends = GetBackendConfig()->NumBackends();
+    // access to the executors_config_ shared_ptr.
+    int num_backends = GetExecutorsConfig()->NumBackends();
     total_assignments_ = metrics_->AddCounter<int64_t>(ASSIGNMENTS_KEY, 0);
     total_local_assignments_ = 
metrics_->AddCounter<int64_t>(LOCAL_ASSIGNMENTS_KEY, 0);
     initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
@@ -152,20 +140,6 @@ Status Scheduler::Init() {
   return Status::OK();
 }
 
-void Scheduler::BackendsUrlCallback(
-    const Webserver::ArgumentMap& args, Document* document) {
-  BackendConfig::BackendList backends;
-  BackendConfigPtr backend_config = GetBackendConfig();
-  backend_config->GetAllBackends(&backends);
-  Value backends_list(kArrayType);
-  for (const TBackendDescriptor& backend : backends) {
-    Value str(TNetworkAddressToString(backend.address).c_str(), 
document->GetAllocator());
-    backends_list.PushBack(str, document->GetAllocator());
-  }
-
-  document->AddMember("backends", backends_list, document->GetAllocator());
-}
-
 void Scheduler::UpdateMembership(
     const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
     vector<TTopicDelta>* subscriber_topic_updates) {
@@ -177,26 +151,29 @@ void Scheduler::UpdateMembership(
   const TTopicDelta& delta = topic->second;
 
   // If the delta transmitted by the statestore is empty we can skip processing
-  // altogether and avoid making a copy of backend_config_.
+  // altogether and avoid making a copy of executors_config_.
   if (delta.is_delta && delta.topic_entries.empty() && 
delta.topic_deletions.empty()) {
     return;
   }
 
   // This function needs to handle both delta and non-delta updates. To 
minimize the
-  // time needed to hold locks, all updates are applied to a copy of 
backend_config_,
+  // time needed to hold locks, all updates are applied to a copy of
+  // executors_config_,
   // which is then swapped into place atomically.
-  std::shared_ptr<BackendConfig> new_backend_config;
+  std::shared_ptr<BackendConfig> new_executors_config;
 
   if (!delta.is_delta) {
-    current_membership_.clear();
-    new_backend_config = std::make_shared<BackendConfig>();
+    current_executors_.clear();
+    new_executors_config = std::make_shared<BackendConfig>();
   } else {
     // Make a copy
-    lock_guard<mutex> lock(backend_config_lock_);
-    new_backend_config = std::make_shared<BackendConfig>(*backend_config_);
+    lock_guard<mutex> lock(executors_config_lock_);
+    new_executors_config = std::make_shared<BackendConfig>(*executors_config_);
   }
 
-  // Process new entries to the topic
+  // Process new entries to the topic. Update executors_config_ and
+  // current_executors_ to match the set of executors given by the
+  // subscriber_topic_updates.
   for (const TTopicItem& item : delta.topic_entries) {
     TBackendDescriptor be_desc;
     // Benchmarks have suggested that this method can deserialize
@@ -225,42 +202,44 @@ void Scheduler::UpdateMembership(
                                << be_desc.address;
       continue;
     }
-    new_backend_config->AddBackend(be_desc);
-    current_membership_.insert(make_pair(item.key, be_desc));
+    if (be_desc.is_executor) {
+      new_executors_config->AddBackend(be_desc);
+      current_executors_.insert(make_pair(item.key, be_desc));
+    }
   }
 
   // Process deletions from the topic
   for (const string& backend_id : delta.topic_deletions) {
-    if (current_membership_.find(backend_id) != current_membership_.end()) {
-      new_backend_config->RemoveBackend(current_membership_[backend_id]);
-      current_membership_.erase(backend_id);
+    if (current_executors_.find(backend_id) != current_executors_.end()) {
+      new_executors_config->RemoveBackend(current_executors_[backend_id]);
+      current_executors_.erase(backend_id);
     }
   }
 
-  SetBackendConfig(new_backend_config);
+  SetExecutorsConfig(new_executors_config);
 
   if (metrics_ != nullptr) {
     /// TODO-MT: fix this (do we even need to report it?)
-    num_fragment_instances_metric_->set_value(current_membership_.size());
+    num_fragment_instances_metric_->set_value(current_executors_.size());
   }
 }
 
-Scheduler::BackendConfigPtr Scheduler::GetBackendConfig() const {
-  lock_guard<mutex> l(backend_config_lock_);
-  DCHECK(backend_config_.get() != nullptr);
-  BackendConfigPtr backend_config = backend_config_;
-  return backend_config;
+Scheduler::ExecutorsConfigPtr Scheduler::GetExecutorsConfig() const {
+  lock_guard<mutex> l(executors_config_lock_);
+  DCHECK(executors_config_.get() != nullptr);
+  ExecutorsConfigPtr executor_config = executors_config_;
+  return executor_config;
 }
 
-void Scheduler::SetBackendConfig(const BackendConfigPtr& backend_config) {
-  lock_guard<mutex> l(backend_config_lock_);
-  backend_config_ = backend_config;
+void Scheduler::SetExecutorsConfig(const ExecutorsConfigPtr& executors_config) 
{
+  lock_guard<mutex> l(executors_config_lock_);
+  executors_config_ = executors_config;
 }
 
 Status Scheduler::ComputeScanRangeAssignment(QuerySchedule* schedule) {
   RuntimeProfile::Counter* total_assignment_timer =
       ADD_TIMER(schedule->summary_profile(), 
"ComputeScanRangeAssignmentTimer");
-  BackendConfigPtr backend_config = GetBackendConfig();
+  ExecutorsConfigPtr executor_config = GetExecutorsConfig();
   const TQueryExecRequest& exec_request = schedule->request();
   for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
     for (const auto& entry : plan_exec_info.per_node_scan_ranges) {
@@ -283,7 +262,7 @@ Status Scheduler::ComputeScanRangeAssignment(QuerySchedule* 
schedule) {
       FragmentScanRangeAssignment* assignment =
           
&schedule->GetFragmentExecParams(fragment.idx)->scan_range_assignment;
       RETURN_IF_ERROR(
-          ComputeScanRangeAssignment(*backend_config, node_id, 
node_replica_preference,
+          ComputeScanRangeAssignment(*executor_config, node_id, 
node_replica_preference,
               node_random_replica, entry.second, exec_request.host_list, 
exec_at_coord,
               schedule->query_options(), total_assignment_timer, assignment));
       schedule->IncNumScanRanges(entry.second.size());
@@ -518,13 +497,13 @@ void Scheduler::CreateCollocatedInstances(
   }
 }
 
-Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& 
backend_config,
+Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& 
executor_config,
     PlanNodeId node_id, const TReplicaPreference::type* 
node_replica_preference,
     bool node_random_replica, const vector<TScanRangeLocationList>& locations,
     const vector<TNetworkAddress>& host_list, bool exec_at_coord,
     const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
     FragmentScanRangeAssignment* assignment) {
-  if (backend_config.NumBackends() == 0 && !exec_at_coord) {
+  if (executor_config.NumBackends() == 0 && !exec_at_coord) {
     return Status(TErrorCode::NO_REGISTERED_BACKENDS);
   }
 
@@ -544,40 +523,40 @@ Status Scheduler::ComputeScanRangeAssignment(const 
BackendConfig& backend_config
   // A preference attached to the plan node takes precedence.
   if (node_replica_preference) base_distance = *node_replica_preference;
 
-  // Between otherwise equivalent backends we optionally break ties by 
comparing their
+  // Between otherwise equivalent executors we optionally break ties by 
comparing their
   // random rank.
   bool random_replica = query_options.schedule_random_replica || 
node_random_replica;
 
   AssignmentCtx assignment_ctx(
-      exec_at_coord ? coord_only_backend_config_ : backend_config, 
total_assignments_,
+      exec_at_coord ? coord_only_backend_config_ : executor_config, 
total_assignments_,
       total_local_assignments_);
 
   vector<const TScanRangeLocationList*> remote_scan_range_locations;
 
-  // Loop over all scan ranges, select a backend for those with local impalads 
and collect
-  // all others for later processing.
+  // Loop over all scan ranges, select an executor for those with local 
impalads and
+  // collect all others for later processing.
   for (const TScanRangeLocationList& scan_range_locations : locations) {
     TReplicaPreference::type min_distance = TReplicaPreference::REMOTE;
 
-    // Select backend host for the current scan range.
+    // Select executor for the current scan range.
     if (exec_at_coord) {
-      DCHECK(assignment_ctx.backend_config().LookUpBackendIp(
+      DCHECK(assignment_ctx.executor_config().LookUpBackendIp(
           local_backend_descriptor_.address.hostname, nullptr));
       assignment_ctx.RecordScanRangeAssignment(local_backend_descriptor_, 
node_id,
           host_list, scan_range_locations, assignment);
     } else {
-      // Collect backend candidates with smallest memory distance.
-      vector<IpAddr> backend_candidates;
+      // Collect executor candidates with smallest memory distance.
+      vector<IpAddr> executor_candidates;
       if (base_distance < TReplicaPreference::REMOTE) {
         for (const TScanRangeLocation& location : 
scan_range_locations.locations) {
           const TNetworkAddress& replica_host = host_list[location.host_idx];
-          // Determine the adjusted memory distance to the closest backend for 
the replica
-          // host.
+          // Determine the adjusted memory distance to the closest executor 
for the
+          // replica host.
           TReplicaPreference::type memory_distance = 
TReplicaPreference::REMOTE;
-          IpAddr backend_ip;
-          bool has_local_backend = 
assignment_ctx.backend_config().LookUpBackendIp(
-              replica_host.hostname, &backend_ip);
-          if (has_local_backend) {
+          IpAddr executor_ip;
+          bool has_local_executor = 
assignment_ctx.executor_config().LookUpBackendIp(
+              replica_host.hostname, &executor_ip);
+          if (has_local_executor) {
             if (location.is_cached) {
               memory_distance = TReplicaPreference::CACHE_LOCAL;
             } else {
@@ -588,58 +567,58 @@ Status Scheduler::ComputeScanRangeAssignment(const 
BackendConfig& backend_config
           }
           memory_distance = max(memory_distance, base_distance);
 
-          // We only need to collect backend candidates for non-remote reads, 
as it is the
-          // nature of remote reads that there is no backend available.
+          // We only need to collect executor candidates for non-remote reads, 
as it is
+          // the nature of remote reads that there is no executor available.
           if (memory_distance < TReplicaPreference::REMOTE) {
-            DCHECK(has_local_backend);
+            DCHECK(has_local_executor);
             // Check if we found a closer replica than the previous ones.
             if (memory_distance < min_distance) {
               min_distance = memory_distance;
-              backend_candidates.clear();
-              backend_candidates.push_back(backend_ip);
+              executor_candidates.clear();
+              executor_candidates.push_back(executor_ip);
             } else if (memory_distance == min_distance) {
-              backend_candidates.push_back(backend_ip);
+              executor_candidates.push_back(executor_ip);
             }
           }
         }
       } // End of candidate selection.
-      DCHECK(!backend_candidates.empty() || min_distance == 
TReplicaPreference::REMOTE);
+      DCHECK(!executor_candidates.empty() || min_distance == 
TReplicaPreference::REMOTE);
 
       // Check the effective memory distance of the candidates to decide 
whether to treat
       // the scan range as cached.
       bool cached_replica = min_distance == TReplicaPreference::CACHE_LOCAL;
 
-      // Pick backend host based on data location.
-      bool local_backend = min_distance != TReplicaPreference::REMOTE;
+      // Pick executor based on data location.
+      bool local_executor = min_distance != TReplicaPreference::REMOTE;
 
-      if (!local_backend) {
+      if (!local_executor) {
         remote_scan_range_locations.push_back(&scan_range_locations);
         continue;
       }
-      // For local reads we want to break ties by backend rank in these cases:
+      // For local reads we want to break ties by executor rank in these cases:
       // - if it is enforced via a query option.
       // - when selecting between cached replicas. In this case there is no OS 
buffer
       //   cache to worry about.
-      // Remote reads will always break ties by backend rank.
+      // Remote reads will always break ties by executor rank.
       bool decide_local_assignment_by_rank = random_replica || cached_replica;
-      const IpAddr* backend_ip = nullptr;
-      backend_ip = assignment_ctx.SelectLocalBackendHost(
-          backend_candidates, decide_local_assignment_by_rank);
-      TBackendDescriptor backend;
-      assignment_ctx.SelectBackendOnHost(*backend_ip, &backend);
+      const IpAddr* executor_ip = nullptr;
+      executor_ip = assignment_ctx.SelectLocalExecutor(
+          executor_candidates, decide_local_assignment_by_rank);
+      TBackendDescriptor executor;
+      assignment_ctx.SelectExecutorOnHost(*executor_ip, &executor);
       assignment_ctx.RecordScanRangeAssignment(
-          backend, node_id, host_list, scan_range_locations, assignment);
-    } // End of backend host selection.
+          executor, node_id, host_list, scan_range_locations, assignment);
+    } // End of executor selection.
   } // End of for loop over scan ranges.
 
-  // Assign remote scans to backends.
+  // Assign remote scans to executors.
   for (const TScanRangeLocationList* scan_range_locations : 
remote_scan_range_locations) {
     DCHECK(!exec_at_coord);
-    const IpAddr* backend_ip = assignment_ctx.SelectRemoteBackendHost();
-    TBackendDescriptor backend;
-    assignment_ctx.SelectBackendOnHost(*backend_ip, &backend);
+    const IpAddr* executor_ip = assignment_ctx.SelectRemoteExecutor();
+    TBackendDescriptor executor;
+    assignment_ctx.SelectExecutorOnHost(*executor_ip, &executor);
     assignment_ctx.RecordScanRangeAssignment(
-        backend, node_id, host_list, *scan_range_locations, assignment);
+        executor, node_id, host_list, *scan_range_locations, assignment);
   }
 
   if (VLOG_FILE_IS_ON) assignment_ctx.PrintAssignment(*assignment);
@@ -736,22 +715,22 @@ Status Scheduler::Schedule(QuerySchedule* schedule) {
   return Status::OK();
 }
 
-Scheduler::AssignmentCtx::AssignmentCtx(const BackendConfig& backend_config,
+Scheduler::AssignmentCtx::AssignmentCtx(const BackendConfig& executor_config,
     IntCounter* total_assignments, IntCounter* total_local_assignments)
-  : backend_config_(backend_config),
-    first_unused_backend_idx_(0),
+  : executors_config_(executor_config),
+    first_unused_executor_idx_(0),
     total_assignments_(total_assignments),
     total_local_assignments_(total_local_assignments) {
-  DCHECK_GT(backend_config.NumBackends(), 0);
-  backend_config.GetAllBackendIps(&random_backend_order_);
+  DCHECK_GT(executor_config.NumBackends(), 0);
+  executor_config.GetAllBackendIps(&random_executor_order_);
   std::mt19937 g(rand());
-  std::shuffle(random_backend_order_.begin(), random_backend_order_.end(), g);
-  // Initialize inverted map for backend rank lookups
+  std::shuffle(random_executor_order_.begin(), random_executor_order_.end(), 
g);
+  // Initialize inverted map for executor rank lookups
   int i = 0;
-  for (const IpAddr& ip : random_backend_order_) random_backend_rank_[ip] = 
i++;
+  for (const IpAddr& ip : random_executor_order_) random_executor_rank_[ip] = 
i++;
 }
 
-const IpAddr* Scheduler::AssignmentCtx::SelectLocalBackendHost(
+const IpAddr* Scheduler::AssignmentCtx::SelectLocalExecutor(
     const std::vector<IpAddr>& data_locations, bool break_ties_by_rank) {
   DCHECK(!data_locations.empty());
   // List of candidate indexes into 'data_locations'.
@@ -759,9 +738,9 @@ const IpAddr* 
Scheduler::AssignmentCtx::SelectLocalBackendHost(
   // Find locations with minimum number of assigned bytes.
   int64_t min_assigned_bytes = numeric_limits<int64_t>::max();
   for (int i = 0; i < data_locations.size(); ++i) {
-    const IpAddr& backend_ip = data_locations[i];
+    const IpAddr& executor_ip = data_locations[i];
     int64_t assigned_bytes = 0;
-    auto handle_it = assignment_heap_.find(backend_ip);
+    auto handle_it = assignment_heap_.find(executor_ip);
     if (handle_it != assignment_heap_.end()) {
       assigned_bytes = (*handle_it->second).assigned_bytes;
     }
@@ -777,69 +756,69 @@ const IpAddr* 
Scheduler::AssignmentCtx::SelectLocalBackendHost(
   if (break_ties_by_rank) {
     min_rank_idx = min_element(candidates_idxs.begin(), candidates_idxs.end(),
         [&data_locations, this](const int& a, const int& b) {
-          return GetBackendRank(data_locations[a]) < 
GetBackendRank(data_locations[b]);
+          return GetExecutorRank(data_locations[a]) < 
GetExecutorRank(data_locations[b]);
         });
   }
   return &data_locations[*min_rank_idx];
 }
 
-const IpAddr* Scheduler::AssignmentCtx::SelectRemoteBackendHost() {
+const IpAddr* Scheduler::AssignmentCtx::SelectRemoteExecutor() {
   const IpAddr* candidate_ip;
-  if (HasUnusedBackends()) {
-    // Pick next unused backend.
-    candidate_ip = GetNextUnusedBackendAndIncrement();
+  if (HasUnusedExecutors()) {
+    // Pick next unused executor.
+    candidate_ip = GetNextUnusedExecutorAndIncrement();
   } else {
-    // Pick next backend from assignment_heap. All backends must have been 
inserted into
+    // Pick next executor from assignment_heap. All executors must have been 
inserted into
     // the heap at this point.
-    DCHECK_GT(backend_config_.NumBackends(), 0);
-    DCHECK_EQ(backend_config_.NumBackends(), assignment_heap_.size());
+    DCHECK_GT(executors_config_.NumBackends(), 0);
+    DCHECK_EQ(executors_config_.NumBackends(), assignment_heap_.size());
     candidate_ip = &(assignment_heap_.top().ip);
   }
   DCHECK(candidate_ip != nullptr);
   return candidate_ip;
 }
 
-bool Scheduler::AssignmentCtx::HasUnusedBackends() const {
-  return first_unused_backend_idx_ < random_backend_order_.size();
+bool Scheduler::AssignmentCtx::HasUnusedExecutors() const {
+  return first_unused_executor_idx_ < random_executor_order_.size();
 }
 
-const IpAddr* Scheduler::AssignmentCtx::GetNextUnusedBackendAndIncrement() {
-  DCHECK(HasUnusedBackends());
-  const IpAddr* ip = &random_backend_order_[first_unused_backend_idx_++];
+const IpAddr* Scheduler::AssignmentCtx::GetNextUnusedExecutorAndIncrement() {
+  DCHECK(HasUnusedExecutors());
+  const IpAddr* ip = &random_executor_order_[first_unused_executor_idx_++];
   return ip;
 }
 
-int Scheduler::AssignmentCtx::GetBackendRank(const IpAddr& ip) const {
-  auto it = random_backend_rank_.find(ip);
-  DCHECK(it != random_backend_rank_.end());
+int Scheduler::AssignmentCtx::GetExecutorRank(const IpAddr& ip) const {
+  auto it = random_executor_rank_.find(ip);
+  DCHECK(it != random_executor_rank_.end());
   return it->second;
 }
 
-void Scheduler::AssignmentCtx::SelectBackendOnHost(
-    const IpAddr& backend_ip, TBackendDescriptor* backend) {
-  DCHECK(backend_config_.LookUpBackendIp(backend_ip, nullptr));
-  const BackendConfig::BackendList& backends_on_host =
-      backend_config_.GetBackendListForHost(backend_ip);
-  DCHECK(backends_on_host.size() > 0);
-  if (backends_on_host.size() == 1) {
-    *backend = *backends_on_host.begin();
+void Scheduler::AssignmentCtx::SelectExecutorOnHost(
+    const IpAddr& executor_ip, TBackendDescriptor* executor) {
+  DCHECK(executors_config_.LookUpBackendIp(executor_ip, nullptr));
+  const BackendConfig::BackendList& executors_on_host =
+      executors_config_.GetBackendListForHost(executor_ip);
+  DCHECK(executors_on_host.size() > 0);
+  if (executors_on_host.size() == 1) {
+    *executor = *executors_on_host.begin();
   } else {
-    BackendConfig::BackendList::const_iterator* next_backend_on_host;
-    next_backend_on_host =
-        FindOrInsert(&next_backend_per_host_, backend_ip, 
backends_on_host.begin());
-    DCHECK(find(backends_on_host.begin(), backends_on_host.end(), 
**next_backend_on_host)
-        != backends_on_host.end());
-    *backend = **next_backend_on_host;
+    BackendConfig::BackendList::const_iterator* next_executor_on_host;
+    next_executor_on_host =
+        FindOrInsert(&next_executor_per_host_, executor_ip, 
executors_on_host.begin());
+    DCHECK(find(executors_on_host.begin(), executors_on_host.end(),
+        **next_executor_on_host) != executors_on_host.end());
+    *executor = **next_executor_on_host;
     // Rotate
-    ++(*next_backend_on_host);
-    if (*next_backend_on_host == backends_on_host.end()) {
-      *next_backend_on_host = backends_on_host.begin();
+    ++(*next_executor_on_host);
+    if (*next_executor_on_host == executors_on_host.end()) {
+      *next_executor_on_host = executors_on_host.begin();
     }
   }
 }
 
 void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
-    const TBackendDescriptor& backend, PlanNodeId node_id,
+    const TBackendDescriptor& executor, PlanNodeId node_id,
     const vector<TNetworkAddress>& host_list,
     const TScanRangeLocationList& scan_range_locations,
     FragmentScanRangeAssignment* assignment) {
@@ -852,12 +831,12 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
     scan_range_length = 1000;
   }
 
-  IpAddr backend_ip;
-  bool ret = backend_config_.LookUpBackendIp(backend.address.hostname, 
&backend_ip);
+  IpAddr executor_ip;
+  bool ret = executors_config_.LookUpBackendIp(executor.address.hostname, 
&executor_ip);
   DCHECK(ret);
-  DCHECK(!backend_ip.empty());
+  DCHECK(!executor_ip.empty());
   assignment_heap_.InsertOrUpdate(
-      backend_ip, scan_range_length, GetBackendRank(backend_ip));
+      executor_ip, scan_range_length, GetExecutorRank(executor_ip));
 
   // See if the read will be remote. This is not the case if the impalad runs 
on one of
   // the replica's datanodes.
@@ -869,8 +848,8 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
   for (const TScanRangeLocation& location : scan_range_locations.locations) {
     const TNetworkAddress& replica_host = host_list[location.host_idx];
     IpAddr replica_ip;
-    if (backend_config_.LookUpBackendIp(replica_host.hostname, &replica_ip)
-        && backend_ip == replica_ip) {
+    if (executors_config_.LookUpBackendIp(replica_host.hostname, &replica_ip)
+        && executor_ip == replica_ip) {
       remote_read = false;
       volume_id = location.volume_id;
       is_cached = location.is_cached;
@@ -892,7 +871,7 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
   }
 
   PerNodeScanRanges* scan_ranges =
-      FindOrInsert(assignment, backend.address, PerNodeScanRanges());
+      FindOrInsert(assignment, executor.address, PerNodeScanRanges());
   vector<TScanRangeParams>* scan_range_params_list =
       FindOrInsert(scan_ranges, node_id, vector<TScanRangeParams>());
   // Add scan range.
@@ -904,7 +883,7 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
   scan_range_params_list->push_back(scan_range_params);
 
   if (VLOG_FILE_IS_ON) {
-    VLOG_FILE << "Scheduler assignment to backend: " << backend.address << "("
+    VLOG_FILE << "Scheduler assignment to executor: " << executor.address << 
"("
               << (remote_read ? "remote" : "local") << " selection)";
   }
 }
@@ -932,16 +911,16 @@ void Scheduler::AssignmentCtx::PrintAssignment(
 
 void Scheduler::AddressableAssignmentHeap::InsertOrUpdate(
     const IpAddr& ip, int64_t assigned_bytes, int rank) {
-  auto handle_it = backend_handles_.find(ip);
-  if (handle_it == backend_handles_.end()) {
-    AssignmentHeap::handle_type handle = backend_heap_.push({assigned_bytes, 
rank, ip});
-    backend_handles_.emplace(ip, handle);
+  auto handle_it = executor_handles_.find(ip);
+  if (handle_it == executor_handles_.end()) {
+    AssignmentHeap::handle_type handle = executor_heap_.push({assigned_bytes, 
rank, ip});
+    executor_handles_.emplace(ip, handle);
   } else {
     // We need to rebuild the heap after every update operation. Calling 
decrease once is
     // sufficient as both assignments decrease the key.
     AssignmentHeap::handle_type handle = handle_it->second;
     (*handle).assigned_bytes += assigned_bytes;
-    backend_heap_.decrease(handle);
+    executor_heap_.decrease(handle);
   }
 }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index ca520c8..7adfb5d 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -48,12 +48,12 @@ namespace test {
 class SchedulerWrapper;
 }
 
-/// Performs simple scheduling by matching between a list of backends 
configured
+/// Performs simple scheduling by matching between a list of executor backends 
configured
 /// either from the statestore, or from a static list of addresses, and a list
-/// of target data locations. The current set of backends is stored in 
backend_config_.
-/// When receiving changes to the backend configuration from the statestore we 
will make a
-/// copy of this configuration, apply the updates to the copy and atomically 
swap the
-/// contents of the backend_config_ pointer.
+/// of target data locations. The current set of executors is stored in 
executors_config_.
+/// When receiving changes to the executor configuration from the statestore 
we will make
+/// a copy of this configuration, apply the updates to the copy and atomically 
swap the
+/// contents of the executors_config_ pointer.
 ///
 /// TODO: Notice when there are duplicate statestore registrations (IMPALA-23)
 /// TODO: Track assignments (assignment_ctx in ComputeScanRangeAssignment) per 
query
@@ -67,9 +67,9 @@ class SchedulerWrapper;
 ///       to make it testable.
 /// TODO: Benchmark the performance of the scheduler. The tests need to 
include setups
 ///       with:
-///         - Small and large number of backends.
+///         - Small and large number of executors.
 ///         - Small and large query plans.
-///         - Scheduling query plans with concurrent updates to the internal 
backend
+///         - Scheduling query plans with concurrent updates to the internal 
executor
 ///           configuration.
 class Scheduler {
  public:
@@ -102,28 +102,28 @@ class Scheduler {
   Status Schedule(QuerySchedule* schedule);
 
  private:
-  /// Map from a host's IP address to the next backend to be round-robin 
scheduled for
-  /// that host (needed for setups with multiple backends on a single host)
+  /// Map from a host's IP address to the next executor to be round-robin 
scheduled for
+  /// that host (needed for setups with multiple executors on a single host)
   typedef boost::unordered_map<IpAddr, 
BackendConfig::BackendList::const_iterator>
-      NextBackendPerHost;
+      NextExecutorPerHost;
 
-  typedef std::shared_ptr<const BackendConfig> BackendConfigPtr;
+  typedef std::shared_ptr<const BackendConfig> ExecutorsConfigPtr;
 
-  /// Internal structure to track scan range assignments for a backend host. 
This struct
+  /// Internal structure to track scan range assignments for an executor host. 
This struct
   /// is used as the heap element in and maintained by 
AddressableAssignmentHeap.
-  struct BackendAssignmentInfo {
-    /// The number of bytes assigned to a backend host.
+  struct ExecutorAssignmentInfo {
+    /// The number of bytes assigned to an executor.
     int64_t assigned_bytes;
 
     /// Each host gets assigned a random rank to break ties in a random but 
deterministic
     /// order per plan node.
     const int random_rank;
 
-    /// IP address of the backend.
+    /// IP address of the executor.
     IpAddr ip;
 
     /// Compare two elements of this struct. The key is (assigned_bytes, 
random_rank).
-    bool operator>(const BackendAssignmentInfo& rhs) const {
+    bool operator>(const ExecutorAssignmentInfo& rhs) const {
       if (assigned_bytes != rhs.assigned_bytes) {
         return assigned_bytes > rhs.assigned_bytes;
       }
@@ -132,87 +132,87 @@ class Scheduler {
   };
 
   /// Heap to compute candidates for scan range assignments. Elements are of 
type
-  /// BackendAssignmentInfo and track assignment information for each backend. 
By default
-  /// boost implements a max-heap so we use std::greater<T> to obtain a 
min-heap. This
-  /// will make the top() element of the heap be the backend with the lowest 
number of
-  /// assigned bytes and the lowest random rank.
-  typedef boost::heap::binomial_heap<BackendAssignmentInfo,
-      boost::heap::compare<std::greater<BackendAssignmentInfo>>>
+  /// ExecutorAssignmentInfo and track assignment information for each 
executor. By
+  /// default boost implements a max-heap so we use std::greater<T> to obtain 
a min-heap.
+  /// This will make the top() element of the heap be the executor with the 
lowest number
+  /// of assigned bytes and the lowest random rank.
+  typedef boost::heap::binomial_heap<ExecutorAssignmentInfo,
+      boost::heap::compare<std::greater<ExecutorAssignmentInfo>>>
       AssignmentHeap;
 
   /// Map to look up handles to heap elements to modify heap element keys.
-  typedef boost::unordered_map<IpAddr, AssignmentHeap::handle_type> 
BackendHandleMap;
+  typedef boost::unordered_map<IpAddr, AssignmentHeap::handle_type> 
ExecutorHandleMap;
 
-  /// Class to store backend information in an addressable heap. In addition to
+  /// Class to store executor information in an addressable heap. In addition 
to
   /// AssignmentHeap it can be used to look up heap elements by their IP 
address and
   /// update their key. For each plan node we create a new heap, so they are 
not shared
   /// between concurrent invocations of the scheduler.
   class AddressableAssignmentHeap {
    public:
-    const AssignmentHeap& backend_heap() const { return backend_heap_; }
-    const BackendHandleMap& backend_handles() const { return backend_handles_; 
}
+    const AssignmentHeap& executor_heap() const { return executor_heap_; }
+    const ExecutorHandleMap& executor_handles() const { return 
executor_handles_; }
 
     void InsertOrUpdate(const IpAddr& ip, int64_t assigned_bytes, int rank);
 
     // Forward interface for boost::heap
-    decltype(auto) size() const { return backend_heap_.size(); }
-    decltype(auto) top() const { return backend_heap_.top(); }
+    decltype(auto) size() const { return executor_heap_.size(); }
+    decltype(auto) top() const { return executor_heap_.top(); }
 
     // Forward interface for boost::unordered_map
-    decltype(auto) find(const IpAddr& ip) const { return 
backend_handles_.find(ip); }
-    decltype(auto) end() const { return backend_handles_.end(); }
+    decltype(auto) find(const IpAddr& ip) const { return 
executor_handles_.find(ip); }
+    decltype(auto) end() const { return executor_handles_.end(); }
 
    private:
-    // Heap to determine next backend.
-    AssignmentHeap backend_heap_;
-    // Maps backend IPs to handles in the heap.
-    BackendHandleMap backend_handles_;
+    // Heap to determine next executor.
+    AssignmentHeap executor_heap_;
+    // Maps executor IPs to handles in the heap.
+    ExecutorHandleMap executor_handles_;
   };
 
   /// Class to store context information on assignments during scheduling. It 
is
-  /// initialized with a copy of the global backend information and assigns a 
random rank
-  /// to each backend to break ties in cases where multiple backends have been 
assigned
-  /// the same number or bytes. It tracks the number of assigned bytes, which 
backends
+  /// initialized with a copy of the global executor information and assigns a 
random rank
+  /// to each executor to break ties in cases where multiple executors have 
been assigned
+  /// the same number or bytes. It tracks the number of assigned bytes, which 
executors
   /// have already been used, etc. Objects of this class are created in
   /// ComputeScanRangeAssignment() and thus don't need to be thread safe.
   class AssignmentCtx {
    public:
-    AssignmentCtx(const BackendConfig& backend_config, IntCounter* 
total_assignments,
+    AssignmentCtx(const BackendConfig& executor_config, IntCounter* 
total_assignments,
         IntCounter* total_local_assignments);
 
     /// Among hosts in 'data_locations', select the one with the minimum 
number of
-    /// assigned bytes. If backends have been assigned equal amounts of work 
and
-    /// 'break_ties_by_rank' is true, then the backend rank is used to break 
ties.
-    /// Otherwise the first backend according to their order in 
'data_locations' is
+    /// assigned bytes. If executors have been assigned equal amounts of work 
and
+    /// 'break_ties_by_rank' is true, then the executor rank is used to break 
ties.
+    /// Otherwise the first executor according to their order in 
'data_locations' is
     /// selected.
-    const IpAddr* SelectLocalBackendHost(
+    const IpAddr* SelectLocalExecutor(
         const std::vector<IpAddr>& data_locations, bool break_ties_by_rank);
 
-    /// Select a backend host for a remote read. If there are unused backend 
hosts, then
+    /// Select an executor for a remote read. If there are unused executor 
hosts, then
     /// those will be preferred. Otherwise the one with the lowest number of 
assigned
-    /// bytes is picked. If backends have been assigned equal amounts of work, 
then the
-    /// backend rank is used to break ties.
-    const IpAddr* SelectRemoteBackendHost();
+    /// bytes is picked. If executors have been assigned equal amounts of 
work, then the
+    /// executor rank is used to break ties.
+    const IpAddr* SelectRemoteExecutor();
 
-    /// Return the next backend that has not been assigned to. This assumes 
that a
-    /// returned backend will also be assigned to. The caller must make sure 
that
-    /// HasUnusedBackends() is true.
-    const IpAddr* GetNextUnusedBackendAndIncrement();
+    /// Return the next executor that has not been assigned to. This assumes 
that a
+    /// returned executor will also be assigned to. The caller must make sure 
that
+    /// HasUnusedExecutors() is true.
+    const IpAddr* GetNextUnusedExecutorAndIncrement();
 
-    /// Pick a backend in round-robin fashion from multiple backends on a 
single host.
-    void SelectBackendOnHost(const IpAddr& backend_ip, TBackendDescriptor* 
backend);
+    /// Pick an executor in round-robin fashion from multiple executors on a 
single host.
+    void SelectExecutorOnHost(const IpAddr& executor_ip, TBackendDescriptor* 
executor);
 
     /// Build a new TScanRangeParams object and append it to the assignment 
list for the
-    /// tuple (backend, node_id) in 'assignment'. Also, update 
assignment_heap_ and
+    /// tuple (executor, node_id) in 'assignment'. Also, update 
assignment_heap_ and
     /// assignment_byte_counters_, increase the counters 'total_assignments_' 
and
     /// 'total_local_assignments_'. 'scan_range_locations' contains 
information about the
     /// scan range and its replica locations.
-    void RecordScanRangeAssignment(const TBackendDescriptor& backend, 
PlanNodeId node_id,
+    void RecordScanRangeAssignment(const TBackendDescriptor& executor, 
PlanNodeId node_id,
         const vector<TNetworkAddress>& host_list,
         const TScanRangeLocationList& scan_range_locations,
         FragmentScanRangeAssignment* assignment);
 
-    const BackendConfig& backend_config() const { return backend_config_; }
+    const BackendConfig& executor_config() const { return executors_config_; }
 
     /// Print the assignment and statistics to VLOG_FILE.
     void PrintAssignment(const FragmentScanRangeAssignment& assignment);
@@ -225,26 +225,26 @@ class Scheduler {
       int64_t cached_bytes = 0;
     };
 
-    /// Used to look up hostnames to IP addresses and IP addresses to backend.
-    const BackendConfig& backend_config_;
+    /// Used to look up hostnames to IP addresses and IP addresses to 
executors.
+    const BackendConfig& executors_config_;
 
-    // Addressable heap to select remote backends from. Elements are ordered 
by the number
-    // of already assigned bytes (and a random rank to break ties).
+    // Addressable heap to select remote executors from. Elements are ordered 
by the
+    // number of already assigned bytes (and a random rank to break ties).
     AddressableAssignmentHeap assignment_heap_;
 
-    /// Store a random rank per backend host to break ties between otherwise 
equivalent
+    /// Store a random rank per executor host to break ties between otherwise 
equivalent
     /// replicas (e.g., those having the same number of assigned bytes).
-    boost::unordered_map<IpAddr, int> random_backend_rank_;
+    boost::unordered_map<IpAddr, int> random_executor_rank_;
 
-    // Index into random_backend_order. It points to the first unused backend 
and is used
-    // to select unused backends and inserting them into the assignment_heap_.
-    int first_unused_backend_idx_;
+    /// Index into random_executor_order. It points to the first unused 
executor and is
+    /// used to select unused executors and inserting them into the 
assignment_heap_.
+    int first_unused_executor_idx_;
 
-    /// Store a random permutation of backend hosts to select backends from.
-    std::vector<IpAddr> random_backend_order_;
+    /// Store a random permutation of executor hosts to select executors from.
+    std::vector<IpAddr> random_executor_order_;
 
-    /// Track round robin information per backend host.
-    NextBackendPerHost next_backend_per_host_;
+    /// Track round robin information per executor host.
+    NextExecutorPerHost next_executor_per_host_;
 
     /// Track number of assigned bytes that have been read from cache, 
locally, or
     /// remotely.
@@ -254,39 +254,40 @@ class Scheduler {
     IntCounter* total_assignments_;
     IntCounter* total_local_assignments_;
 
-    /// Return whether there are backends that have not been assigned a scan 
range.
-    bool HasUnusedBackends() const;
+    /// Return whether there are executors that have not been assigned a scan 
range.
+    bool HasUnusedExecutors() const;
 
-    /// Return the rank of a backend.
-    int GetBackendRank(const IpAddr& ip) const;
+    /// Return the rank of an executor.
+    int GetExecutorRank(const IpAddr& ip) const;
   };
 
-  /// The scheduler's backend configuration. When receiving changes to the 
backend
+  /// The scheduler's executors configuration. When receiving changes to the 
executors
   /// configuration from the statestore we will make a copy of the stored 
object, apply
   /// the updates to the copy and atomically swap the contents of this 
pointer. Each plan
-  /// node creates a read-only copy of the scheduler's current backend_config_ 
to use
+  /// node creates a read-only copy of the scheduler's current 
executors_config_ to use
   /// during scheduling.
-  BackendConfigPtr backend_config_;
+  ExecutorsConfigPtr executors_config_;
 
   /// A backend configuration which only contains the local backend. It is 
used when
   /// scheduling on the coordinator.
   BackendConfig coord_only_backend_config_;
 
-  /// Protect access to backend_config_ which might otherwise be updated 
asynchronously
+  /// Protect access to executors_config_ which might otherwise be updated 
asynchronously
   /// with respect to reads.
-  mutable boost::mutex backend_config_lock_;
+  mutable boost::mutex executors_config_lock_;
 
-  /// Total number of scan ranges assigned to backends during the lifetime of 
the
+  /// Total number of scan ranges assigned to executors during the lifetime of 
the
   /// scheduler.
   int64_t num_assignments_;
 
-  /// Map from unique backend id to TBackendDescriptor. Used to track the 
known backends
-  /// from the statestore. It's important to track both the backend ID as well 
as the
-  /// TBackendDescriptor so we know what is being removed in a given update.
-  /// Locking of this map is not needed since it should only be read/modified 
from
-  /// within the UpdateMembership() function.
+  /// Map from unique backend ID to TBackendDescriptor. The
+  /// {backend ID, TBackendDescriptor} pairs represent the 
IMPALA_MEMBERSHIP_TOPIC
+  /// {key, value} pairs of known executors retrieved from the statestore. 
It's important
+  /// to track both the backend ID as well as the TBackendDescriptor so we 
know what is
+  /// being removed in a given update. Locking of this map is not needed since 
it should
+  /// only be read/modified from within the UpdateMembership() function.
   typedef boost::unordered_map<std::string, TBackendDescriptor> BackendIdMap;
-  BackendIdMap current_membership_;
+  BackendIdMap current_executors_;
 
   /// MetricGroup subsystem access
   MetricGroup* metrics_;
@@ -314,30 +315,22 @@ class Scheduler {
   /// Initialization metric
   BooleanProperty* initialized_;
 
-  /// Current number of backends
+  /// Current number of executors
   IntGauge* num_fragment_instances_metric_;
 
   /// Used for user-to-pool resolution and looking up pool configurations. Not 
owned by
   /// us.
   RequestPoolService* request_pool_service_;
 
-  /// Helper methods to access backend_config_ (the shared_ptr, not its 
contents),
-  /// protecting the access with backend_config_lock_.
-  BackendConfigPtr GetBackendConfig() const;
-  void SetBackendConfig(const BackendConfigPtr& backend_config);
+  /// Helper methods to access executors_config_ (the shared_ptr, not its 
contents),
+  /// protecting the access with executors_config_lock_.
+  ExecutorsConfigPtr GetExecutorsConfig() const;
+  void SetExecutorsConfig(const ExecutorsConfigPtr& executors_config);
 
   /// Called asynchronously when an update is received from the subscription 
manager
   void UpdateMembership(const StatestoreSubscriber::TopicDeltaMap& 
incoming_topic_deltas,
       std::vector<TTopicDelta>* subscriber_topic_updates);
 
-  /// Webserver callback that produces a list of known backends.
-  /// Example output:
-  /// "backends": [
-  ///     "henry-metrics-pkg-cdh5.ent.cloudera.com:22000"
-  ///              ],
-  void BackendsUrlCallback(
-      const Webserver::ArgumentMap& args, rapidjson::Document* document);
-
   /// Determine the pool for a user and query options via 
request_pool_service_.
   Status GetRequestPool(const std::string& user, const TQueryOptions& 
query_options,
       std::string* pool) const;
@@ -356,18 +349,19 @@ class Scheduler {
   /// Otherwise the assignment is computed for each scan range as follows:
   ///
   /// Scan ranges refer to data, which is usually replicated on multiple 
hosts. All scan
-  /// ranges where one of the replica hosts also runs an impala backend are 
processed
-  /// first. If more than one of the replicas run an impala backend, then the 
'memory
-  /// distance' of each backend is considered. The concept of memory distance 
reflects the
-  /// cost of moving data into the processing backend's main memory. Reading 
from cached
-  /// replicas is generally considered less costly than reading from a local 
disk, which
-  /// in turn is cheaper than reading data from a remote node. If multiple 
backends of the
-  /// same memory distance are found, then the one with the least amount of 
previously
-  /// assigned work is picked, thus aiming to distribute the work as evenly as 
possible.
+  /// ranges where one of the replica hosts also runs an impala executor are 
processed
+  /// first. If more than one of the replicas run an impala executor, then the 
'memory
+  /// distance' of each executor is considered. The concept of memory distance 
reflects
+  /// the cost of moving data into the processing executor's main memory. 
Reading from
+  /// cached replicas is generally considered less costly than reading from a 
local disk,
+  /// which in turn is cheaper than reading data from a remote node. If 
multiple executors
+  /// of the same memory distance are found, then the one with the least 
amount of
+  /// previously assigned work is picked, thus aiming to distribute the work 
as evenly as
+  /// possible.
   ///
-  /// Finally, scan ranges are considered which do not have an impalad backend 
running on
+  /// Finally, scan ranges are considered which do not have an impalad 
executor running on
   /// any of their data nodes. They will be load-balanced by assigned bytes 
across all
-  /// backends
+  /// executors.
   ///
   /// The resulting assignment is influenced by the following query options:
   ///
@@ -384,9 +378,9 @@ class Scheduler {
   ///   false.
   ///
   /// schedule_random_replica:
-  ///   When equivalent backends with a memory distance of DISK_LOCAL are 
found for a scan
-  ///   range (same memory distance, same amount of assigned work), then the 
first one
-  ///   will be picked deterministically. This aims to make better use of OS 
buffer
+  ///   When equivalent executors with a memory distance of DISK_LOCAL are 
found for a
+  ///   scan range (same memory distance, same amount of assigned work), then 
the first
+  ///   one will be picked deterministically. This aims to make better use of 
OS buffer
   ///   caches, but can lead to performance bottlenecks on individual hosts. 
Setting this
   ///   option to true will randomly change the order in which equivalent 
replicas are
   ///   picked for different plan nodes. This helps to compute a more even 
assignment,
@@ -396,17 +390,17 @@ class Scheduler {
   ///
   /// The method takes the following parameters:
   ///
-  /// backend_config:          Backend configuration to use for scheduling.
+  /// executor_config:         Executor configuration to use for scheduling.
   /// node_id:                 ID of the plan node.
   /// node_replica_preference: Query hint equivalent to replica_preference.
   /// node_random_replica:     Query hint equivalent to 
schedule_random_replica.
-  /// locations:               List of scan ranges to be assigned to backends.
+  /// locations:               List of scan ranges to be assigned to executors.
   /// host_list:               List of hosts, into which 'locations' will 
index.
   /// exec_at_coord:           Whether to schedule all scan ranges on the 
coordinator.
   /// query_options:           Query options for the current query.
   /// timer:                   Tracks execution time of 
ComputeScanRangeAssignment.
   /// assignment:              Output parameter, to which new assignments will 
be added.
-  Status ComputeScanRangeAssignment(const BackendConfig& backend_config,
+  Status ComputeScanRangeAssignment(const BackendConfig& executor_config,
       PlanNodeId node_id, const TReplicaPreference::type* 
node_replica_preference,
       bool node_random_replica, const std::vector<TScanRangeLocationList>& 
locations,
       const std::vector<TNetworkAddress>& host_list, bool exec_at_coord,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc 
b/be/src/service/impala-http-handler.cc
index ff3c7bc..b9f3382 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -76,6 +76,9 @@ static Status ParseIdFromArguments(const 
Webserver::ArgumentMap& args, TUniqueId
 void ImpalaHttpHandler::RegisterHandlers(Webserver* webserver) {
   DCHECK(webserver != NULL);
 
+  webserver->RegisterUrlCallback("/backends", "backends.tmpl",
+      MakeCallback(this, &ImpalaHttpHandler::BackendsHandler));
+
   webserver->RegisterUrlCallback("/hadoop-varz", "hadoop-varz.tmpl",
       MakeCallback(this, &ImpalaHttpHandler::HadoopVarzHandler));
 
@@ -777,3 +780,19 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool 
include_json_plan, bool include
   Value json_id(PrintId(query_id).c_str(), document->GetAllocator());
   document->AddMember("query_id", json_id, document->GetAllocator());
 }
+
+void ImpalaHttpHandler::BackendsHandler(const Webserver::ArgumentMap& args,
+    Document* document) {
+  Value backends_list(kArrayType);
+  for (const auto& entry : server_->GetKnownBackends()) {
+    TBackendDescriptor backend = entry.second;
+    Value backend_obj(kObjectType);
+    Value str(TNetworkAddressToString(backend.address).c_str(), 
document->GetAllocator());
+    backend_obj.AddMember("address", str, document->GetAllocator());
+    backend_obj.AddMember("is_coordinator", backend.is_coordinator,
+        document->GetAllocator());
+    backend_obj.AddMember("is_executor", backend.is_executor, 
document->GetAllocator());
+    backends_list.PushBack(backend_obj, document->GetAllocator());
+  }
+  document->AddMember("backends", backends_list, document->GetAllocator());
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/service/impala-http-handler.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.h 
b/be/src/service/impala-http-handler.h
index 7be4729..485f6db 100644
--- a/be/src/service/impala-http-handler.h
+++ b/be/src/service/impala-http-handler.h
@@ -144,6 +144,16 @@ class ImpalaHttpHandler {
   /// QueryStateHandler().
   void QueryStateToJson(const ImpalaServer::QueryStateRecord& record,
       rapidjson::Value* value, rapidjson::Document* document);
+
+  /// Json callback for /backends, which prints a table of known backends.
+  /// "backends" : [
+  /// {
+  ///   "address": "localhost:21000",
+  ///   "is_coordinator": true,
+  ///   "is_executor": false
+  ///   }
+  /// ]
+  void BackendsHandler(const Webserver::ArgumentMap& args, 
rapidjson::Document* document);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 80c1507..86cb0c9 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -86,6 +86,7 @@
 using boost::adopt_lock_t;
 using boost::algorithm::is_any_of;
 using boost::algorithm::istarts_with;
+using boost::algorithm::join;
 using boost::algorithm::replace_all_copy;
 using boost::algorithm::split;
 using boost::algorithm::token_compress_on;
@@ -96,6 +97,7 @@ using boost::uuids::uuid;
 using namespace apache::thrift;
 using namespace boost::posix_time;
 using namespace beeswax;
+using namespace rapidjson;
 using namespace strings;
 
 DECLARE_int32(be_port);
@@ -178,8 +180,9 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, 
that a query may be i
     " the maximum allowable timeout.");
 
 DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and 
coordinate "
-    "queries from clients. If false, this daemon will only execute query 
fragments, and "
-    "will refuse client connections.");
+    "queries from clients. If false, it will refuse client connections.");
+DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query 
"
+    "fragments.");
 
 // TODO: Remove for Impala 3.0.
 DEFINE_string(local_nodemanager_url, "", "Deprecated");
@@ -370,6 +373,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
       bind<void>(&ImpalaServer::ExpireQueries, this)));
 
   is_coordinator_ = FLAGS_is_coordinator;
+  is_executor_ = FLAGS_is_executor;
   exec_env_->SetImpalaServer(this);
 }
 
@@ -405,6 +409,11 @@ Status ImpalaServer::LogLineageRecord(const 
QueryExecState& query_exec_state) {
 }
 
 bool ImpalaServer::IsCoordinator() { return is_coordinator_; }
+bool ImpalaServer::IsExecutor() { return is_executor_; }
+
+const ImpalaServer::BackendDescriptorMap& ImpalaServer::GetKnownBackends() {
+  return known_backends_;
+}
 
 bool ImpalaServer::IsLineageLoggingEnabled() {
   return !FLAGS_lineage_event_log_dir.empty();
@@ -1585,6 +1594,8 @@ void ImpalaServer::AddLocalBackendToStatestore(
   if (known_backends_.find(local_backend_id) != known_backends_.end()) return;
 
   TBackendDescriptor local_backend_descriptor;
+  local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator);
+  local_backend_descriptor.__set_is_executor(FLAGS_is_executor);
   local_backend_descriptor.__set_address(
       MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port));
   IpAddr ip;
@@ -1889,6 +1900,9 @@ Status CreateImpalaServer(ExecEnv* exec_env, int 
beeswax_port, int hs2_port, int
   DCHECK((beeswax_port == 0) == (beeswax_server == nullptr));
   DCHECK((hs2_port == 0) == (hs2_server == nullptr));
   DCHECK((be_port == 0) == (be_server == nullptr));
+  if (!FLAGS_is_coordinator && !FLAGS_is_executor) {
+    return Status("Impala server needs to have a role (EXECUTOR, 
COORDINATOR)");
+  }
 
   impala_server->reset(new ImpalaServer(exec_env));
 
@@ -1910,8 +1924,10 @@ Status CreateImpalaServer(ExecEnv* exec_env, int 
beeswax_port, int hs2_port, int
 
     LOG(INFO) << "ImpalaInternalService listening on " << be_port;
   }
+
   if (!FLAGS_is_coordinator) {
-    LOG(INFO) << "Started worker Impala server on "
+
+    LOG(INFO) << "Started executor Impala server on "
               << ExecEnv::GetInstance()->backend_address();
     return Status::OK();
   }
@@ -1961,7 +1977,7 @@ Status CreateImpalaServer(ExecEnv* exec_env, int 
beeswax_port, int hs2_port, int
     LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_port;
   }
 
-  LOG(INFO) << "Started coordinator Impala server on "
+  LOG(INFO) << "Started coordinator/executor Impala server on "
             << ExecEnv::GetInstance()->backend_address();
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 45e8080..34e204d 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -273,6 +273,12 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   /// Retuns true if this is a coordinator, false otherwise.
   bool IsCoordinator();
 
+  /// Returns true if this is an executor, false otherwise.
+  bool IsExecutor();
+
+  typedef boost::unordered_map<std::string, TBackendDescriptor> 
BackendDescriptorMap;
+  const BackendDescriptorMap& GetKnownBackends();
+
   /// The prefix of audit event log filename.
   static const string AUDIT_EVENT_LOG_FILE_PREFIX;
 
@@ -853,16 +859,17 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
       QueryLocations;
   QueryLocations query_locations_;
 
-  /// A map from unique backend ID to the corresponding TBackendDescriptor of 
that backend.
-  /// Used to track membership updates from the statestore so queries can be 
cancelled
-  /// when a backend is removed. It's not enough to just cancel fragments that 
are running
-  /// based on the deletions mentioned in the most recent statestore 
heartbeat; sometimes
-  /// cancellations are skipped and the statestore, at its discretion, may 
send only
-  /// a delta of the current membership so we need to compute any deletions.
+  /// A map from unique backend ID to the corresponding TBackendDescriptor of 
that
+  /// backend. Used to track membership updates from the statestore so queries 
can be
+  /// cancelled when a backend is removed. It's not enough to just cancel 
fragments that
+  /// are running based on the deletions mentioned in the most recent 
statestore
+  /// heartbeat; sometimes cancellations are skipped and the statestore, at its
+  /// discretion, may send only a delta of the current membership so we need 
to compute
+  /// any deletions.
   /// TODO: Currently there are multiple locations where cluster membership is 
tracked,
-  /// here and in the scheduler. This should be consolidated so there is a 
single component
-  /// (the scheduler?) that tracks this information and calls other interested 
components.
-  typedef boost::unordered_map<std::string, TBackendDescriptor> 
BackendDescriptorMap;
+  /// here and in the scheduler. This should be consolidated so there is a 
single
+  /// component (the scheduler?) that tracks this information and calls other 
interested
+  /// components.
   BackendDescriptorMap known_backends_;
 
   /// Generate unique session id for HiveServer2 session
@@ -956,6 +963,9 @@ class ImpalaServer : public ImpalaServiceIf, public 
ImpalaHiveServer2ServiceIf,
   /// True if this ImpalaServer can accept client connections and coordinate
   /// queries.
   bool is_coordinator_;
+
+  /// True if this ImpalaServer can execute query fragments.
+  bool is_executor_;
 };
 
 /// Create an ImpalaServer and Thrift servers.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/util/network-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index 0a722cc..4382e99 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -154,6 +154,8 @@ TBackendDescriptor MakeBackendDescriptor(const Hostname& 
hostname, const IpAddr&
   TBackendDescriptor be_desc;
   be_desc.address = MakeNetworkAddress(hostname, port);
   be_desc.ip_address = ip;
+  be_desc.is_coordinator = true;
+  be_desc.is_executor = true;
   return be_desc;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/util/webserver.cc
----------------------------------------------------------------------
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 3b7c4f9..3e587a2 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -56,6 +56,7 @@ typedef sig_t sighandler_t;
 #endif
 
 using boost::algorithm::is_any_of;
+using boost::algorithm::join;
 using boost::algorithm::split;
 using boost::algorithm::trim_right;
 using boost::algorithm::to_lower;
@@ -193,10 +194,11 @@ void Webserver::RootHandler(const ArgumentMap& args, 
Document* document) {
 
   ExecEnv* env = ExecEnv::GetInstance();
   if (env == nullptr || env->impala_server() == nullptr) return;
-  string mode = (env->impala_server()->IsCoordinator()) ?
-      "Coordinator + Executor" : "Executor";
-  Value impala_server_mode(mode.c_str(), document->GetAllocator());
-  document->AddMember("impala_server_mode", impala_server_mode, 
document->GetAllocator());
+  document->AddMember("impala_server_mode", true, document->GetAllocator());
+  document->AddMember("is_coordinator", env->impala_server()->IsCoordinator(),
+      document->GetAllocator());
+  document->AddMember("is_executor", env->impala_server()->IsExecutor(),
+      document->GetAllocator());
 }
 
 void Webserver::ErrorHandler(const ArgumentMap& args, Document* document) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 37671e3..2b42959 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -38,6 +38,10 @@ parser.add_option("-s", "--cluster_size", type="int", 
dest="cluster_size", defau
                   help="Size of the cluster (number of impalad instances to 
start).")
 parser.add_option("-c", "--num_coordinators", type="int", 
dest="num_coordinators",
                   default=3, help="Number of coordinators.")
+parser.add_option("--use_exclusive_coordinators", 
dest="use_exclusive_coordinators",
+                  action="store_true", default=False, help="If true, 
coordinators only "
+                  "coordinate queries and execute coordinator fragments. If 
false, "
+                  "coordinators also act as executors.")
 parser.add_option("--build_type", dest="build_type", default= 'latest',
                   help="Build type to use - debug / release / latest")
 parser.add_option("--impalad_args", dest="impalad_args", action="append", 
type="string",
@@ -50,7 +54,7 @@ parser.add_option("--catalogd_args", dest="catalogd_args", 
action="append",
                   type="string", default=[],
                   help="Additional arguments to pass to the Catalog Service at 
startup")
 parser.add_option("--kill", "--kill_only", dest="kill_only", 
action="store_true",
-                  default=False, help="Instead of starting the cluster, just 
kill all"\
+                  default=False, help="Instead of starting the cluster, just 
kill all"
                   " the running impalads and the statestored.")
 parser.add_option("--force_kill", dest="force_kill", action="store_true", 
default=False,
                   help="Force kill impalad and statestore processes.")
@@ -67,7 +71,7 @@ parser.add_option('--max_log_files', 
default=DEFAULT_IMPALA_MAX_LOG_FILES,
 parser.add_option("-v", "--verbose", dest="verbose", action="store_true", 
default=False,
                   help="Prints all output to stderr/stdout.")
 parser.add_option("--wait_for_cluster", dest="wait_for_cluster", 
action="store_true",
-                  default=False, help="Wait until the cluster is ready to 
accept "\
+                  default=False, help="Wait until the cluster is ready to 
accept "
                   "queries before returning.")
 parser.add_option("--log_level", type="int", dest="log_level", default=1,
                    help="Set the impalad backend logging level")
@@ -206,7 +210,10 @@ def build_jvm_args(instance_num):
   BASE_JVM_DEBUG_PORT = 30000
   return JVM_ARGS % (BASE_JVM_DEBUG_PORT + instance_num, options.jvm_args)
 
-def start_impalad_instances(cluster_size, num_coordinators):
+def start_impalad_instances(cluster_size, num_coordinators, 
use_exclusive_coordinators):
+  """Start 'cluster_size' impalad instances. The first 'num_coordinator' 
instances will
+    act as coordinators. 'use_exclusive_coordinators' specifies whether the 
coordinators
+    will only execute coordinator fragments."""
   if cluster_size == 0:
     # No impalad instances should be started.
     return
@@ -244,6 +251,9 @@ def start_impalad_instances(cluster_size, num_coordinators):
 
     if i >= num_coordinators:
       args = "-is_coordinator=false %s" % (args)
+    elif use_exclusive_coordinators:
+      # Coordinator instance that doesn't execute non-coordinator fragments
+      args = "-is_executor=false %s" % (args)
 
     stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % 
service_name)
     exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
@@ -285,9 +295,9 @@ def 
wait_for_cluster_web(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
   # impalad processes may take a while to come up.
   wait_for_impala_process_count(impala_cluster)
   for impalad in impala_cluster.impalads:
+    impalad.service.wait_for_num_known_live_backends(options.cluster_size,
+        timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
     if impalad._get_arg_value('is_coordinator', default='true') == 'true':
-      impalad.service.wait_for_num_known_live_backends(options.cluster_size,
-          timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
       wait_for_catalog(impalad, 
timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS)
 
 def wait_for_catalog(impalad, timeout_in_seconds):
@@ -337,6 +347,10 @@ if __name__ == "__main__":
     print 'Please specify a valid number of coordinators > 0'
     sys.exit(1)
 
+  if options.use_exclusive_coordinators and options.num_coordinators >= 
options.cluster_size:
+    print 'Cannot start an Impala cluster with no executors'
+    sys.exit(1)
+
   if not os.path.isdir(options.log_dir):
     print 'Log dir does not exist or is not a directory: %s' % options.log_dir
     sys.exit(1)
@@ -383,7 +397,8 @@ if __name__ == "__main__":
       if not options.restart_impalad_only:
         start_statestore()
         start_catalogd()
-      start_impalad_instances(options.cluster_size, options.num_coordinators)
+      start_impalad_instances(options.cluster_size, options.num_coordinators,
+          options.use_exclusive_coordinators)
       # Sleep briefly to reduce log spam: the cluster takes some time to start 
up.
       sleep(3)
       wait_for_cluster()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift 
b/common/thrift/StatestoreService.thrift
index 1677635..90400b2 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -47,7 +47,8 @@ struct TPoolStats {
 // Structure serialised in the Impala backend topic. Each Impalad
 // constructs one TBackendDescriptor, and registers it in the backend
 // topic. Impalads subscribe to this topic to learn of the location of
-// all other Impalads in the cluster.
+// all other Impalads in the cluster. Impalads can act as coordinators, 
executors or
+// both.
 struct TBackendDescriptor {
   // Network address of the Impala service on this backend
   1: required Types.TNetworkAddress address;
@@ -56,11 +57,17 @@ struct TBackendDescriptor {
   // cost of resolution at every Impalad (since IP addresses are needed for 
scheduling)
   2: required string ip_address;
 
+  // True if this is a coordinator node
+  3: required bool is_coordinator;
+
+  // True if this is an executor node
+  4: required bool is_executor;
+
   // The address of the debug HTTP server
-  3: optional Types.TNetworkAddress debug_http_address;
+  5: optional Types.TNetworkAddress debug_http_address;
 
   // True if the debug webserver is secured (for correctly generating links)
-  4: optional bool secure_webserver;
+  6: optional bool secure_webserver;
 }
 
 // Description of a single entry in a topic

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index b19b5d2..377b38e 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -108,13 +108,15 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
   @classmethod
   def _start_impala_cluster(cls, options, log_dir=os.getenv('LOG_DIR', 
"/tmp/"),
-      cluster_size=CLUSTER_SIZE, num_coordinators=NUM_COORDINATORS, 
log_level=1):
+      cluster_size=CLUSTER_SIZE, num_coordinators=NUM_COORDINATORS,
+      use_exclusive_coordinators=False, log_level=1):
     cls.impala_log_dir = log_dir
     cmd = [os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'),
            '--cluster_size=%d' % cluster_size,
            '--num_coordinators=%d' % num_coordinators,
            '--log_dir=%s' % log_dir,
            '--log_level=%s' % log_level]
+    if use_exclusive_coordinators: cmd.append("--use_exclusive_coordinators")
     try:
       check_call(cmd + options, close_fds=True)
     finally:
@@ -125,8 +127,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       raise Exception("statestored was not found")
     statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60)
     for impalad in cls.cluster.impalads:
-      if impalad._get_arg_value('is_coordinator', default='true') == 'true':
-        impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE, 
timeout=60)
+      impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE, 
timeout=60)
 
   def assert_impalad_log_contains(self, level, line_regex, expected_count=1):
     """

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/tests/custom_cluster/test_coordinators.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_coordinators.py 
b/tests/custom_cluster/test_coordinators.py
index 6010404..cac4512 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -84,3 +84,37 @@ class TestCoordinators(CustomClusterTestSuite):
       assert num_tbls == 0
       client1.close()
       client2.close()
+
+  @pytest.mark.execute_serially
+  def test_single_coordinator_cluster_config(self):
+    """Test a cluster configuration with a single coordinator."""
+
+    def exec_and_verify_num_executors(expected_num_of_executors):
+      """Connects to the coordinator node, runs a query and verifies that 
certain
+        operators are executed on 'expected_num_of_executors' nodes."""
+      coordinator = self.cluster.impalads[0]
+      try:
+        client = coordinator.service.create_beeswax_client()
+        assert client is not None
+        query = "select count(*) from functional.alltypesagg"
+        result = self.execute_query_expect_success(client, query)
+        # Verify that SCAN and AGG are executed on the expected number of
+        # executor nodes
+        for rows in result.exec_summary:
+          if rows['operator'] == 'OO:SCAN HDFS':
+            assert rows['num_hosts'] == expected_num_of_executors
+          elif rows['operator'] == '01:AGGREGATE':
+            assert rows['num_hosts'] == expected_num_of_executors
+      finally:
+        client.close()
+
+    # Cluster config where the coordinator can execute query fragments
+    self._start_impala_cluster([], cluster_size=3, num_coordinators=1,
+        use_exclusive_coordinators=False)
+    exec_and_verify_num_executors(3)
+    # Stop the cluster
+    self._stop_impala_cluster()
+    # Cluster config where the oordinator can only execute coordinator 
fragments
+    self._start_impala_cluster([], cluster_size=3, num_coordinators=1,
+        use_exclusive_coordinators=True)
+    exec_and_verify_num_executors(2)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/www/backends.tmpl
----------------------------------------------------------------------
diff --git a/www/backends.tmpl b/www/backends.tmpl
index cef77b7..20cfa11 100644
--- a/www/backends.tmpl
+++ b/www/backends.tmpl
@@ -23,13 +23,17 @@ under the License.
 <table id="backends" class='table table-hover table-bordered'>
   <thead>
     <tr>
-      <th>Backend</th>
+      <th>Address</th>
+      <th>Coordinator</th>
+      <th>Executor</th>
     </tr>
   </thead>
   <tbody>
     {{#backends}}
     <tr>
-      <td>{{.}}</td>
+      <td>{{address}}</td>
+      <td>{{is_coordinator}}</td>
+      <td>{{is_executor}}</td>
     </tr>
     {{/backends}}
   </tbody>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/www/root.tmpl
----------------------------------------------------------------------
diff --git a/www/root.tmpl b/www/root.tmpl
index 40d448a..d9c6d2a 100644
--- a/www/root.tmpl
+++ b/www/root.tmpl
@@ -19,7 +19,8 @@ under the License.
 {{! Template for / }}
 {{>www/common-header.tmpl}}
   {{?impala_server_mode}}
-  <h2>Impala Server Mode: {{impala_server_mode}}</h2>
+  <h2>Impala Server Mode: {{?is_coordinator}}Coordinator{{/is_coordinator}} 
+    {{?is_executor}}Executor{{/is_executor}}</h2>
   {{/impala_server_mode}}
 
   <h2>Vers<span id="v">i</span>on</h2>

Reply via email to