IMPALA-5147: Add the ability to exclude hosts from query execution This commit introduces a new startup option, termed 'is_executor', that determines whether an impalad process can execute query fragments. The 'is_executor' option determines if a specific host will be included in the scheduler's backend configuration and hence included in scheduling decisions.
Testing: - Added a customer cluster test. - Added a new scheduler test. Change-Id: I5d2ff7f341c9d2b0649e4d14561077e166ad7c4d Reviewed-on: http://gerrit.cloudera.org:8080/6628 Reviewed-by: Dimitris Tsirogiannis <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e2c53a8b Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e2c53a8b Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e2c53a8b Branch: refs/heads/master Commit: e2c53a8bdf646331b29c3de921b681d0d885c82e Parents: 5809317 Author: Dimitris Tsirogiannis <[email protected]> Authored: Thu Apr 13 11:18:47 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Apr 26 01:45:40 2017 +0000 ---------------------------------------------------------------------- be/src/scheduling/scheduler-test-util.cc | 13 +- be/src/scheduling/scheduler-test-util.h | 15 +- be/src/scheduling/scheduler-test.cc | 21 ++ be/src/scheduling/scheduler.cc | 285 ++++++++++++------------- be/src/scheduling/scheduler.h | 220 ++++++++++--------- be/src/service/impala-http-handler.cc | 19 ++ be/src/service/impala-http-handler.h | 10 + be/src/service/impala-server.cc | 24 ++- be/src/service/impala-server.h | 28 ++- be/src/util/network-util.cc | 2 + be/src/util/webserver.cc | 10 +- bin/start-impala-cluster.py | 27 ++- common/thrift/StatestoreService.thrift | 13 +- tests/common/custom_cluster_test_suite.py | 7 +- tests/custom_cluster/test_coordinators.py | 34 +++ www/backends.tmpl | 8 +- www/root.tmpl | 3 +- 17 files changed, 432 insertions(+), 307 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler-test-util.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc index 782379c..7547782 100644 --- a/be/src/scheduling/scheduler-test-util.cc +++ b/be/src/scheduling/scheduler-test-util.cc @@ -58,14 +58,14 @@ const string Cluster::IP_PREFIX = "10"; /// Default size for new blocks is 1MB. const int64_t Block::DEFAULT_BLOCK_SIZE = 1 << 20; -int Cluster::AddHost(bool has_backend, bool has_datanode) { +int Cluster::AddHost(bool has_backend, bool has_datanode, bool is_executor) { int host_idx = hosts_.size(); int be_port = has_backend ? BACKEND_PORT : -1; int dn_port = has_datanode ? DATANODE_PORT : -1; IpAddr ip = HostIdxToIpAddr(host_idx); DCHECK(ip_to_idx_.find(ip) == ip_to_idx_.end()); ip_to_idx_[ip] = host_idx; - hosts_.push_back(Host(HostIdxToHostname(host_idx), ip, be_port, dn_port)); + hosts_.push_back(Host(HostIdxToHostname(host_idx), ip, be_port, dn_port, is_executor)); // Add host to lists of backend indexes per type. if (has_backend) backend_host_idxs_.push_back(host_idx); if (has_datanode) { @@ -79,8 +79,9 @@ int Cluster::AddHost(bool has_backend, bool has_datanode) { return host_idx; } -void Cluster::AddHosts(int num_hosts, bool has_backend, bool has_datanode) { - for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode); +void Cluster::AddHosts(int num_hosts, bool has_backend, bool has_datanode, + bool is_executor) { + for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode, is_executor); } Hostname Cluster::HostIdxToHostname(int host_idx) { @@ -454,7 +455,7 @@ Status SchedulerWrapper::Compute(bool exec_at_coord, Result* result) { // Compute Assignment. FragmentScanRangeAssignment* assignment = result->AddAssignment(); - return scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0, + return scheduler_->ComputeScanRangeAssignment(*scheduler_->GetExecutorsConfig(), 0, nullptr, false, plan_.scan_range_locations(), plan_.referenced_datanodes(), exec_at_coord, plan_.query_options(), nullptr, assignment); } @@ -519,6 +520,8 @@ void SchedulerWrapper::AddHostToTopicDelta(const Host& host, TTopicDelta* delta) be_desc.address.hostname = host.ip; be_desc.address.port = host.be_port; be_desc.ip_address = host.ip; + be_desc.__set_is_coordinator(host.is_coordinator); + be_desc.__set_is_executor(host.is_executor); // Build topic item. TTopicItem item; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler-test-util.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler-test-util.h b/be/src/scheduling/scheduler-test-util.h index 6cce021..c3d6b1c 100644 --- a/be/src/scheduling/scheduler-test-util.h +++ b/be/src/scheduling/scheduler-test-util.h @@ -86,13 +86,17 @@ enum class ReplicaPlacement { /// Host model. Each host can have either a backend, a datanode, or both. To specify that /// a host should not act as a backend or datanode specify '-1' as the respective port. +/// A host with a backend is always a coordinator but it may not be an executor. struct Host { - Host(const Hostname& name, const IpAddr& ip, int be_port, int dn_port) - : name(name), ip(ip), be_port(be_port), dn_port(dn_port) {} + Host(const Hostname& name, const IpAddr& ip, int be_port, int dn_port, bool is_executor) + : name(name), ip(ip), be_port(be_port), dn_port(dn_port), is_coordinator(true), + is_executor(is_executor) {} Hostname name; IpAddr ip; int be_port; // Backend port int dn_port; // Datanode port + bool is_coordinator; // True if this is a coordinator host + bool is_executor; // True if this is an executor host }; /// A cluster stores a list of hosts and provides various methods to add hosts to the @@ -101,10 +105,13 @@ class Cluster { public: /// Add a host and return the host's index. 'hostname' and 'ip' of the new host will be /// generated and are guaranteed to be unique. - int AddHost(bool has_backend, bool has_datanode); + /// TODO: Refactor the construction of a host and its addition to a cluster to + /// avoid the boolean input parameters. + int AddHost(bool has_backend, bool has_datanode, bool is_executor = true); /// Add a number of hosts with the same properties by repeatedly calling AddHost(..). - void AddHosts(int num_hosts, bool has_backend, bool has_datanode); + void AddHosts(int num_hosts, bool has_backend, bool has_datanode, + bool is_executor = true); /// Convert a host index to a hostname. static Hostname HostIdxToHostname(int host_idx); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler-test.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc index 3e05c5b..c7b284b 100644 --- a/be/src/scheduling/scheduler-test.cc +++ b/be/src/scheduling/scheduler-test.cc @@ -52,6 +52,27 @@ TEST_F(SchedulerTest, SingleHostSingleFile) { EXPECT_EQ(0, result.NumCachedAssignments()); } +/// Test cluster configuration with one coordinator that can't process scan ranges. +TEST_F(SchedulerTest, SingleCoordinatorNoExecutor) { + Cluster cluster; + cluster.AddHost(true, true, false); + cluster.AddHost(true, true, true); + cluster.AddHost(true, true, true); + + Schema schema(cluster); + schema.AddMultiBlockTable("T1", 10, ReplicaPlacement::LOCAL_ONLY, 3); + + Plan plan(schema); + plan.AddTableScan("T1"); + + Result result(plan); + SchedulerWrapper scheduler(plan); + scheduler.Compute(&result); + + EXPECT_EQ(2, result.NumDistinctBackends()); + EXPECT_EQ(0, result.NumDiskAssignments(0)); +} + /// Test assigning all scan ranges to the coordinator. TEST_F(SchedulerTest, ExecAtCoord) { Cluster cluster; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index a73e13a..bca5965 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -29,7 +29,6 @@ #include "common/logging.h" #include "gen-cpp/ImpalaInternalService_constants.h" #include "gen-cpp/Types_types.h" -#include "rapidjson/rapidjson.h" #include "runtime/exec-env.h" #include "statestore/statestore-subscriber.h" #include "util/container-util.h" @@ -41,7 +40,6 @@ using boost::algorithm::join; using namespace apache::thrift; -using namespace rapidjson; using namespace strings; DECLARE_int32(be_port); @@ -54,15 +52,12 @@ static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total"); static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized"); static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends"); -static const string BACKENDS_WEB_PAGE = "/backends"; -static const string BACKENDS_TEMPLATE = "backends.tmpl"; - const string Scheduler::IMPALA_MEMBERSHIP_TOPIC("impala-membership"); Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id, const TNetworkAddress& backend_address, MetricGroup* metrics, Webserver* webserver, RequestPoolService* request_pool_service) - : backend_config_(std::make_shared<const BackendConfig>()), + : executors_config_(std::make_shared<const BackendConfig>()), metrics_(metrics->GetOrCreateChildGroup("scheduler")), webserver_(webserver), statestore_subscriber_(subscriber), @@ -77,7 +72,7 @@ Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id, Scheduler::Scheduler(const vector<TNetworkAddress>& backends, MetricGroup* metrics, Webserver* webserver, RequestPoolService* request_pool_service) - : backend_config_(std::make_shared<const BackendConfig>(backends)), + : executors_config_(std::make_shared<const BackendConfig>(backends)), metrics_(metrics), webserver_(webserver), statestore_subscriber_(nullptr), @@ -109,13 +104,6 @@ Status Scheduler::Init() { coord_only_backend_config_.AddBackend(local_backend_descriptor_); - if (webserver_ != nullptr) { - Webserver::UrlCallback backends_callback = - bind<void>(mem_fn(&Scheduler::BackendsUrlCallback), this, _1, _2); - webserver_->RegisterUrlCallback( - BACKENDS_WEB_PAGE, BACKENDS_TEMPLATE, backends_callback); - } - if (statestore_subscriber_ != nullptr) { StatestoreSubscriber::UpdateCallback cb = bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2); @@ -128,8 +116,8 @@ Status Scheduler::Init() { if (metrics_ != nullptr) { // This is after registering with the statestored, so we already have to synchronize - // access to the backend_config_ shared_ptr. - int num_backends = GetBackendConfig()->NumBackends(); + // access to the executors_config_ shared_ptr. + int num_backends = GetExecutorsConfig()->NumBackends(); total_assignments_ = metrics_->AddCounter<int64_t>(ASSIGNMENTS_KEY, 0); total_local_assignments_ = metrics_->AddCounter<int64_t>(LOCAL_ASSIGNMENTS_KEY, 0); initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true); @@ -152,20 +140,6 @@ Status Scheduler::Init() { return Status::OK(); } -void Scheduler::BackendsUrlCallback( - const Webserver::ArgumentMap& args, Document* document) { - BackendConfig::BackendList backends; - BackendConfigPtr backend_config = GetBackendConfig(); - backend_config->GetAllBackends(&backends); - Value backends_list(kArrayType); - for (const TBackendDescriptor& backend : backends) { - Value str(TNetworkAddressToString(backend.address).c_str(), document->GetAllocator()); - backends_list.PushBack(str, document->GetAllocator()); - } - - document->AddMember("backends", backends_list, document->GetAllocator()); -} - void Scheduler::UpdateMembership( const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas, vector<TTopicDelta>* subscriber_topic_updates) { @@ -177,26 +151,29 @@ void Scheduler::UpdateMembership( const TTopicDelta& delta = topic->second; // If the delta transmitted by the statestore is empty we can skip processing - // altogether and avoid making a copy of backend_config_. + // altogether and avoid making a copy of executors_config_. if (delta.is_delta && delta.topic_entries.empty() && delta.topic_deletions.empty()) { return; } // This function needs to handle both delta and non-delta updates. To minimize the - // time needed to hold locks, all updates are applied to a copy of backend_config_, + // time needed to hold locks, all updates are applied to a copy of + // executors_config_, // which is then swapped into place atomically. - std::shared_ptr<BackendConfig> new_backend_config; + std::shared_ptr<BackendConfig> new_executors_config; if (!delta.is_delta) { - current_membership_.clear(); - new_backend_config = std::make_shared<BackendConfig>(); + current_executors_.clear(); + new_executors_config = std::make_shared<BackendConfig>(); } else { // Make a copy - lock_guard<mutex> lock(backend_config_lock_); - new_backend_config = std::make_shared<BackendConfig>(*backend_config_); + lock_guard<mutex> lock(executors_config_lock_); + new_executors_config = std::make_shared<BackendConfig>(*executors_config_); } - // Process new entries to the topic + // Process new entries to the topic. Update executors_config_ and + // current_executors_ to match the set of executors given by the + // subscriber_topic_updates. for (const TTopicItem& item : delta.topic_entries) { TBackendDescriptor be_desc; // Benchmarks have suggested that this method can deserialize @@ -225,42 +202,44 @@ void Scheduler::UpdateMembership( << be_desc.address; continue; } - new_backend_config->AddBackend(be_desc); - current_membership_.insert(make_pair(item.key, be_desc)); + if (be_desc.is_executor) { + new_executors_config->AddBackend(be_desc); + current_executors_.insert(make_pair(item.key, be_desc)); + } } // Process deletions from the topic for (const string& backend_id : delta.topic_deletions) { - if (current_membership_.find(backend_id) != current_membership_.end()) { - new_backend_config->RemoveBackend(current_membership_[backend_id]); - current_membership_.erase(backend_id); + if (current_executors_.find(backend_id) != current_executors_.end()) { + new_executors_config->RemoveBackend(current_executors_[backend_id]); + current_executors_.erase(backend_id); } } - SetBackendConfig(new_backend_config); + SetExecutorsConfig(new_executors_config); if (metrics_ != nullptr) { /// TODO-MT: fix this (do we even need to report it?) - num_fragment_instances_metric_->set_value(current_membership_.size()); + num_fragment_instances_metric_->set_value(current_executors_.size()); } } -Scheduler::BackendConfigPtr Scheduler::GetBackendConfig() const { - lock_guard<mutex> l(backend_config_lock_); - DCHECK(backend_config_.get() != nullptr); - BackendConfigPtr backend_config = backend_config_; - return backend_config; +Scheduler::ExecutorsConfigPtr Scheduler::GetExecutorsConfig() const { + lock_guard<mutex> l(executors_config_lock_); + DCHECK(executors_config_.get() != nullptr); + ExecutorsConfigPtr executor_config = executors_config_; + return executor_config; } -void Scheduler::SetBackendConfig(const BackendConfigPtr& backend_config) { - lock_guard<mutex> l(backend_config_lock_); - backend_config_ = backend_config; +void Scheduler::SetExecutorsConfig(const ExecutorsConfigPtr& executors_config) { + lock_guard<mutex> l(executors_config_lock_); + executors_config_ = executors_config; } Status Scheduler::ComputeScanRangeAssignment(QuerySchedule* schedule) { RuntimeProfile::Counter* total_assignment_timer = ADD_TIMER(schedule->summary_profile(), "ComputeScanRangeAssignmentTimer"); - BackendConfigPtr backend_config = GetBackendConfig(); + ExecutorsConfigPtr executor_config = GetExecutorsConfig(); const TQueryExecRequest& exec_request = schedule->request(); for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) { for (const auto& entry : plan_exec_info.per_node_scan_ranges) { @@ -283,7 +262,7 @@ Status Scheduler::ComputeScanRangeAssignment(QuerySchedule* schedule) { FragmentScanRangeAssignment* assignment = &schedule->GetFragmentExecParams(fragment.idx)->scan_range_assignment; RETURN_IF_ERROR( - ComputeScanRangeAssignment(*backend_config, node_id, node_replica_preference, + ComputeScanRangeAssignment(*executor_config, node_id, node_replica_preference, node_random_replica, entry.second, exec_request.host_list, exec_at_coord, schedule->query_options(), total_assignment_timer, assignment)); schedule->IncNumScanRanges(entry.second.size()); @@ -518,13 +497,13 @@ void Scheduler::CreateCollocatedInstances( } } -Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& backend_config, +Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& executor_config, PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference, bool node_random_replica, const vector<TScanRangeLocationList>& locations, const vector<TNetworkAddress>& host_list, bool exec_at_coord, const TQueryOptions& query_options, RuntimeProfile::Counter* timer, FragmentScanRangeAssignment* assignment) { - if (backend_config.NumBackends() == 0 && !exec_at_coord) { + if (executor_config.NumBackends() == 0 && !exec_at_coord) { return Status(TErrorCode::NO_REGISTERED_BACKENDS); } @@ -544,40 +523,40 @@ Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& backend_config // A preference attached to the plan node takes precedence. if (node_replica_preference) base_distance = *node_replica_preference; - // Between otherwise equivalent backends we optionally break ties by comparing their + // Between otherwise equivalent executors we optionally break ties by comparing their // random rank. bool random_replica = query_options.schedule_random_replica || node_random_replica; AssignmentCtx assignment_ctx( - exec_at_coord ? coord_only_backend_config_ : backend_config, total_assignments_, + exec_at_coord ? coord_only_backend_config_ : executor_config, total_assignments_, total_local_assignments_); vector<const TScanRangeLocationList*> remote_scan_range_locations; - // Loop over all scan ranges, select a backend for those with local impalads and collect - // all others for later processing. + // Loop over all scan ranges, select an executor for those with local impalads and + // collect all others for later processing. for (const TScanRangeLocationList& scan_range_locations : locations) { TReplicaPreference::type min_distance = TReplicaPreference::REMOTE; - // Select backend host for the current scan range. + // Select executor for the current scan range. if (exec_at_coord) { - DCHECK(assignment_ctx.backend_config().LookUpBackendIp( + DCHECK(assignment_ctx.executor_config().LookUpBackendIp( local_backend_descriptor_.address.hostname, nullptr)); assignment_ctx.RecordScanRangeAssignment(local_backend_descriptor_, node_id, host_list, scan_range_locations, assignment); } else { - // Collect backend candidates with smallest memory distance. - vector<IpAddr> backend_candidates; + // Collect executor candidates with smallest memory distance. + vector<IpAddr> executor_candidates; if (base_distance < TReplicaPreference::REMOTE) { for (const TScanRangeLocation& location : scan_range_locations.locations) { const TNetworkAddress& replica_host = host_list[location.host_idx]; - // Determine the adjusted memory distance to the closest backend for the replica - // host. + // Determine the adjusted memory distance to the closest executor for the + // replica host. TReplicaPreference::type memory_distance = TReplicaPreference::REMOTE; - IpAddr backend_ip; - bool has_local_backend = assignment_ctx.backend_config().LookUpBackendIp( - replica_host.hostname, &backend_ip); - if (has_local_backend) { + IpAddr executor_ip; + bool has_local_executor = assignment_ctx.executor_config().LookUpBackendIp( + replica_host.hostname, &executor_ip); + if (has_local_executor) { if (location.is_cached) { memory_distance = TReplicaPreference::CACHE_LOCAL; } else { @@ -588,58 +567,58 @@ Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& backend_config } memory_distance = max(memory_distance, base_distance); - // We only need to collect backend candidates for non-remote reads, as it is the - // nature of remote reads that there is no backend available. + // We only need to collect executor candidates for non-remote reads, as it is + // the nature of remote reads that there is no executor available. if (memory_distance < TReplicaPreference::REMOTE) { - DCHECK(has_local_backend); + DCHECK(has_local_executor); // Check if we found a closer replica than the previous ones. if (memory_distance < min_distance) { min_distance = memory_distance; - backend_candidates.clear(); - backend_candidates.push_back(backend_ip); + executor_candidates.clear(); + executor_candidates.push_back(executor_ip); } else if (memory_distance == min_distance) { - backend_candidates.push_back(backend_ip); + executor_candidates.push_back(executor_ip); } } } } // End of candidate selection. - DCHECK(!backend_candidates.empty() || min_distance == TReplicaPreference::REMOTE); + DCHECK(!executor_candidates.empty() || min_distance == TReplicaPreference::REMOTE); // Check the effective memory distance of the candidates to decide whether to treat // the scan range as cached. bool cached_replica = min_distance == TReplicaPreference::CACHE_LOCAL; - // Pick backend host based on data location. - bool local_backend = min_distance != TReplicaPreference::REMOTE; + // Pick executor based on data location. + bool local_executor = min_distance != TReplicaPreference::REMOTE; - if (!local_backend) { + if (!local_executor) { remote_scan_range_locations.push_back(&scan_range_locations); continue; } - // For local reads we want to break ties by backend rank in these cases: + // For local reads we want to break ties by executor rank in these cases: // - if it is enforced via a query option. // - when selecting between cached replicas. In this case there is no OS buffer // cache to worry about. - // Remote reads will always break ties by backend rank. + // Remote reads will always break ties by executor rank. bool decide_local_assignment_by_rank = random_replica || cached_replica; - const IpAddr* backend_ip = nullptr; - backend_ip = assignment_ctx.SelectLocalBackendHost( - backend_candidates, decide_local_assignment_by_rank); - TBackendDescriptor backend; - assignment_ctx.SelectBackendOnHost(*backend_ip, &backend); + const IpAddr* executor_ip = nullptr; + executor_ip = assignment_ctx.SelectLocalExecutor( + executor_candidates, decide_local_assignment_by_rank); + TBackendDescriptor executor; + assignment_ctx.SelectExecutorOnHost(*executor_ip, &executor); assignment_ctx.RecordScanRangeAssignment( - backend, node_id, host_list, scan_range_locations, assignment); - } // End of backend host selection. + executor, node_id, host_list, scan_range_locations, assignment); + } // End of executor selection. } // End of for loop over scan ranges. - // Assign remote scans to backends. + // Assign remote scans to executors. for (const TScanRangeLocationList* scan_range_locations : remote_scan_range_locations) { DCHECK(!exec_at_coord); - const IpAddr* backend_ip = assignment_ctx.SelectRemoteBackendHost(); - TBackendDescriptor backend; - assignment_ctx.SelectBackendOnHost(*backend_ip, &backend); + const IpAddr* executor_ip = assignment_ctx.SelectRemoteExecutor(); + TBackendDescriptor executor; + assignment_ctx.SelectExecutorOnHost(*executor_ip, &executor); assignment_ctx.RecordScanRangeAssignment( - backend, node_id, host_list, *scan_range_locations, assignment); + executor, node_id, host_list, *scan_range_locations, assignment); } if (VLOG_FILE_IS_ON) assignment_ctx.PrintAssignment(*assignment); @@ -736,22 +715,22 @@ Status Scheduler::Schedule(QuerySchedule* schedule) { return Status::OK(); } -Scheduler::AssignmentCtx::AssignmentCtx(const BackendConfig& backend_config, +Scheduler::AssignmentCtx::AssignmentCtx(const BackendConfig& executor_config, IntCounter* total_assignments, IntCounter* total_local_assignments) - : backend_config_(backend_config), - first_unused_backend_idx_(0), + : executors_config_(executor_config), + first_unused_executor_idx_(0), total_assignments_(total_assignments), total_local_assignments_(total_local_assignments) { - DCHECK_GT(backend_config.NumBackends(), 0); - backend_config.GetAllBackendIps(&random_backend_order_); + DCHECK_GT(executor_config.NumBackends(), 0); + executor_config.GetAllBackendIps(&random_executor_order_); std::mt19937 g(rand()); - std::shuffle(random_backend_order_.begin(), random_backend_order_.end(), g); - // Initialize inverted map for backend rank lookups + std::shuffle(random_executor_order_.begin(), random_executor_order_.end(), g); + // Initialize inverted map for executor rank lookups int i = 0; - for (const IpAddr& ip : random_backend_order_) random_backend_rank_[ip] = i++; + for (const IpAddr& ip : random_executor_order_) random_executor_rank_[ip] = i++; } -const IpAddr* Scheduler::AssignmentCtx::SelectLocalBackendHost( +const IpAddr* Scheduler::AssignmentCtx::SelectLocalExecutor( const std::vector<IpAddr>& data_locations, bool break_ties_by_rank) { DCHECK(!data_locations.empty()); // List of candidate indexes into 'data_locations'. @@ -759,9 +738,9 @@ const IpAddr* Scheduler::AssignmentCtx::SelectLocalBackendHost( // Find locations with minimum number of assigned bytes. int64_t min_assigned_bytes = numeric_limits<int64_t>::max(); for (int i = 0; i < data_locations.size(); ++i) { - const IpAddr& backend_ip = data_locations[i]; + const IpAddr& executor_ip = data_locations[i]; int64_t assigned_bytes = 0; - auto handle_it = assignment_heap_.find(backend_ip); + auto handle_it = assignment_heap_.find(executor_ip); if (handle_it != assignment_heap_.end()) { assigned_bytes = (*handle_it->second).assigned_bytes; } @@ -777,69 +756,69 @@ const IpAddr* Scheduler::AssignmentCtx::SelectLocalBackendHost( if (break_ties_by_rank) { min_rank_idx = min_element(candidates_idxs.begin(), candidates_idxs.end(), [&data_locations, this](const int& a, const int& b) { - return GetBackendRank(data_locations[a]) < GetBackendRank(data_locations[b]); + return GetExecutorRank(data_locations[a]) < GetExecutorRank(data_locations[b]); }); } return &data_locations[*min_rank_idx]; } -const IpAddr* Scheduler::AssignmentCtx::SelectRemoteBackendHost() { +const IpAddr* Scheduler::AssignmentCtx::SelectRemoteExecutor() { const IpAddr* candidate_ip; - if (HasUnusedBackends()) { - // Pick next unused backend. - candidate_ip = GetNextUnusedBackendAndIncrement(); + if (HasUnusedExecutors()) { + // Pick next unused executor. + candidate_ip = GetNextUnusedExecutorAndIncrement(); } else { - // Pick next backend from assignment_heap. All backends must have been inserted into + // Pick next executor from assignment_heap. All executors must have been inserted into // the heap at this point. - DCHECK_GT(backend_config_.NumBackends(), 0); - DCHECK_EQ(backend_config_.NumBackends(), assignment_heap_.size()); + DCHECK_GT(executors_config_.NumBackends(), 0); + DCHECK_EQ(executors_config_.NumBackends(), assignment_heap_.size()); candidate_ip = &(assignment_heap_.top().ip); } DCHECK(candidate_ip != nullptr); return candidate_ip; } -bool Scheduler::AssignmentCtx::HasUnusedBackends() const { - return first_unused_backend_idx_ < random_backend_order_.size(); +bool Scheduler::AssignmentCtx::HasUnusedExecutors() const { + return first_unused_executor_idx_ < random_executor_order_.size(); } -const IpAddr* Scheduler::AssignmentCtx::GetNextUnusedBackendAndIncrement() { - DCHECK(HasUnusedBackends()); - const IpAddr* ip = &random_backend_order_[first_unused_backend_idx_++]; +const IpAddr* Scheduler::AssignmentCtx::GetNextUnusedExecutorAndIncrement() { + DCHECK(HasUnusedExecutors()); + const IpAddr* ip = &random_executor_order_[first_unused_executor_idx_++]; return ip; } -int Scheduler::AssignmentCtx::GetBackendRank(const IpAddr& ip) const { - auto it = random_backend_rank_.find(ip); - DCHECK(it != random_backend_rank_.end()); +int Scheduler::AssignmentCtx::GetExecutorRank(const IpAddr& ip) const { + auto it = random_executor_rank_.find(ip); + DCHECK(it != random_executor_rank_.end()); return it->second; } -void Scheduler::AssignmentCtx::SelectBackendOnHost( - const IpAddr& backend_ip, TBackendDescriptor* backend) { - DCHECK(backend_config_.LookUpBackendIp(backend_ip, nullptr)); - const BackendConfig::BackendList& backends_on_host = - backend_config_.GetBackendListForHost(backend_ip); - DCHECK(backends_on_host.size() > 0); - if (backends_on_host.size() == 1) { - *backend = *backends_on_host.begin(); +void Scheduler::AssignmentCtx::SelectExecutorOnHost( + const IpAddr& executor_ip, TBackendDescriptor* executor) { + DCHECK(executors_config_.LookUpBackendIp(executor_ip, nullptr)); + const BackendConfig::BackendList& executors_on_host = + executors_config_.GetBackendListForHost(executor_ip); + DCHECK(executors_on_host.size() > 0); + if (executors_on_host.size() == 1) { + *executor = *executors_on_host.begin(); } else { - BackendConfig::BackendList::const_iterator* next_backend_on_host; - next_backend_on_host = - FindOrInsert(&next_backend_per_host_, backend_ip, backends_on_host.begin()); - DCHECK(find(backends_on_host.begin(), backends_on_host.end(), **next_backend_on_host) - != backends_on_host.end()); - *backend = **next_backend_on_host; + BackendConfig::BackendList::const_iterator* next_executor_on_host; + next_executor_on_host = + FindOrInsert(&next_executor_per_host_, executor_ip, executors_on_host.begin()); + DCHECK(find(executors_on_host.begin(), executors_on_host.end(), + **next_executor_on_host) != executors_on_host.end()); + *executor = **next_executor_on_host; // Rotate - ++(*next_backend_on_host); - if (*next_backend_on_host == backends_on_host.end()) { - *next_backend_on_host = backends_on_host.begin(); + ++(*next_executor_on_host); + if (*next_executor_on_host == executors_on_host.end()) { + *next_executor_on_host = executors_on_host.begin(); } } } void Scheduler::AssignmentCtx::RecordScanRangeAssignment( - const TBackendDescriptor& backend, PlanNodeId node_id, + const TBackendDescriptor& executor, PlanNodeId node_id, const vector<TNetworkAddress>& host_list, const TScanRangeLocationList& scan_range_locations, FragmentScanRangeAssignment* assignment) { @@ -852,12 +831,12 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment( scan_range_length = 1000; } - IpAddr backend_ip; - bool ret = backend_config_.LookUpBackendIp(backend.address.hostname, &backend_ip); + IpAddr executor_ip; + bool ret = executors_config_.LookUpBackendIp(executor.address.hostname, &executor_ip); DCHECK(ret); - DCHECK(!backend_ip.empty()); + DCHECK(!executor_ip.empty()); assignment_heap_.InsertOrUpdate( - backend_ip, scan_range_length, GetBackendRank(backend_ip)); + executor_ip, scan_range_length, GetExecutorRank(executor_ip)); // See if the read will be remote. This is not the case if the impalad runs on one of // the replica's datanodes. @@ -869,8 +848,8 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment( for (const TScanRangeLocation& location : scan_range_locations.locations) { const TNetworkAddress& replica_host = host_list[location.host_idx]; IpAddr replica_ip; - if (backend_config_.LookUpBackendIp(replica_host.hostname, &replica_ip) - && backend_ip == replica_ip) { + if (executors_config_.LookUpBackendIp(replica_host.hostname, &replica_ip) + && executor_ip == replica_ip) { remote_read = false; volume_id = location.volume_id; is_cached = location.is_cached; @@ -892,7 +871,7 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment( } PerNodeScanRanges* scan_ranges = - FindOrInsert(assignment, backend.address, PerNodeScanRanges()); + FindOrInsert(assignment, executor.address, PerNodeScanRanges()); vector<TScanRangeParams>* scan_range_params_list = FindOrInsert(scan_ranges, node_id, vector<TScanRangeParams>()); // Add scan range. @@ -904,7 +883,7 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment( scan_range_params_list->push_back(scan_range_params); if (VLOG_FILE_IS_ON) { - VLOG_FILE << "Scheduler assignment to backend: " << backend.address << "(" + VLOG_FILE << "Scheduler assignment to executor: " << executor.address << "(" << (remote_read ? "remote" : "local") << " selection)"; } } @@ -932,16 +911,16 @@ void Scheduler::AssignmentCtx::PrintAssignment( void Scheduler::AddressableAssignmentHeap::InsertOrUpdate( const IpAddr& ip, int64_t assigned_bytes, int rank) { - auto handle_it = backend_handles_.find(ip); - if (handle_it == backend_handles_.end()) { - AssignmentHeap::handle_type handle = backend_heap_.push({assigned_bytes, rank, ip}); - backend_handles_.emplace(ip, handle); + auto handle_it = executor_handles_.find(ip); + if (handle_it == executor_handles_.end()) { + AssignmentHeap::handle_type handle = executor_heap_.push({assigned_bytes, rank, ip}); + executor_handles_.emplace(ip, handle); } else { // We need to rebuild the heap after every update operation. Calling decrease once is // sufficient as both assignments decrease the key. AssignmentHeap::handle_type handle = handle_it->second; (*handle).assigned_bytes += assigned_bytes; - backend_heap_.decrease(handle); + executor_heap_.decrease(handle); } } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/scheduling/scheduler.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h index ca520c8..7adfb5d 100644 --- a/be/src/scheduling/scheduler.h +++ b/be/src/scheduling/scheduler.h @@ -48,12 +48,12 @@ namespace test { class SchedulerWrapper; } -/// Performs simple scheduling by matching between a list of backends configured +/// Performs simple scheduling by matching between a list of executor backends configured /// either from the statestore, or from a static list of addresses, and a list -/// of target data locations. The current set of backends is stored in backend_config_. -/// When receiving changes to the backend configuration from the statestore we will make a -/// copy of this configuration, apply the updates to the copy and atomically swap the -/// contents of the backend_config_ pointer. +/// of target data locations. The current set of executors is stored in executors_config_. +/// When receiving changes to the executor configuration from the statestore we will make +/// a copy of this configuration, apply the updates to the copy and atomically swap the +/// contents of the executors_config_ pointer. /// /// TODO: Notice when there are duplicate statestore registrations (IMPALA-23) /// TODO: Track assignments (assignment_ctx in ComputeScanRangeAssignment) per query @@ -67,9 +67,9 @@ class SchedulerWrapper; /// to make it testable. /// TODO: Benchmark the performance of the scheduler. The tests need to include setups /// with: -/// - Small and large number of backends. +/// - Small and large number of executors. /// - Small and large query plans. -/// - Scheduling query plans with concurrent updates to the internal backend +/// - Scheduling query plans with concurrent updates to the internal executor /// configuration. class Scheduler { public: @@ -102,28 +102,28 @@ class Scheduler { Status Schedule(QuerySchedule* schedule); private: - /// Map from a host's IP address to the next backend to be round-robin scheduled for - /// that host (needed for setups with multiple backends on a single host) + /// Map from a host's IP address to the next executor to be round-robin scheduled for + /// that host (needed for setups with multiple executors on a single host) typedef boost::unordered_map<IpAddr, BackendConfig::BackendList::const_iterator> - NextBackendPerHost; + NextExecutorPerHost; - typedef std::shared_ptr<const BackendConfig> BackendConfigPtr; + typedef std::shared_ptr<const BackendConfig> ExecutorsConfigPtr; - /// Internal structure to track scan range assignments for a backend host. This struct + /// Internal structure to track scan range assignments for an executor host. This struct /// is used as the heap element in and maintained by AddressableAssignmentHeap. - struct BackendAssignmentInfo { - /// The number of bytes assigned to a backend host. + struct ExecutorAssignmentInfo { + /// The number of bytes assigned to an executor. int64_t assigned_bytes; /// Each host gets assigned a random rank to break ties in a random but deterministic /// order per plan node. const int random_rank; - /// IP address of the backend. + /// IP address of the executor. IpAddr ip; /// Compare two elements of this struct. The key is (assigned_bytes, random_rank). - bool operator>(const BackendAssignmentInfo& rhs) const { + bool operator>(const ExecutorAssignmentInfo& rhs) const { if (assigned_bytes != rhs.assigned_bytes) { return assigned_bytes > rhs.assigned_bytes; } @@ -132,87 +132,87 @@ class Scheduler { }; /// Heap to compute candidates for scan range assignments. Elements are of type - /// BackendAssignmentInfo and track assignment information for each backend. By default - /// boost implements a max-heap so we use std::greater<T> to obtain a min-heap. This - /// will make the top() element of the heap be the backend with the lowest number of - /// assigned bytes and the lowest random rank. - typedef boost::heap::binomial_heap<BackendAssignmentInfo, - boost::heap::compare<std::greater<BackendAssignmentInfo>>> + /// ExecutorAssignmentInfo and track assignment information for each executor. By + /// default boost implements a max-heap so we use std::greater<T> to obtain a min-heap. + /// This will make the top() element of the heap be the executor with the lowest number + /// of assigned bytes and the lowest random rank. + typedef boost::heap::binomial_heap<ExecutorAssignmentInfo, + boost::heap::compare<std::greater<ExecutorAssignmentInfo>>> AssignmentHeap; /// Map to look up handles to heap elements to modify heap element keys. - typedef boost::unordered_map<IpAddr, AssignmentHeap::handle_type> BackendHandleMap; + typedef boost::unordered_map<IpAddr, AssignmentHeap::handle_type> ExecutorHandleMap; - /// Class to store backend information in an addressable heap. In addition to + /// Class to store executor information in an addressable heap. In addition to /// AssignmentHeap it can be used to look up heap elements by their IP address and /// update their key. For each plan node we create a new heap, so they are not shared /// between concurrent invocations of the scheduler. class AddressableAssignmentHeap { public: - const AssignmentHeap& backend_heap() const { return backend_heap_; } - const BackendHandleMap& backend_handles() const { return backend_handles_; } + const AssignmentHeap& executor_heap() const { return executor_heap_; } + const ExecutorHandleMap& executor_handles() const { return executor_handles_; } void InsertOrUpdate(const IpAddr& ip, int64_t assigned_bytes, int rank); // Forward interface for boost::heap - decltype(auto) size() const { return backend_heap_.size(); } - decltype(auto) top() const { return backend_heap_.top(); } + decltype(auto) size() const { return executor_heap_.size(); } + decltype(auto) top() const { return executor_heap_.top(); } // Forward interface for boost::unordered_map - decltype(auto) find(const IpAddr& ip) const { return backend_handles_.find(ip); } - decltype(auto) end() const { return backend_handles_.end(); } + decltype(auto) find(const IpAddr& ip) const { return executor_handles_.find(ip); } + decltype(auto) end() const { return executor_handles_.end(); } private: - // Heap to determine next backend. - AssignmentHeap backend_heap_; - // Maps backend IPs to handles in the heap. - BackendHandleMap backend_handles_; + // Heap to determine next executor. + AssignmentHeap executor_heap_; + // Maps executor IPs to handles in the heap. + ExecutorHandleMap executor_handles_; }; /// Class to store context information on assignments during scheduling. It is - /// initialized with a copy of the global backend information and assigns a random rank - /// to each backend to break ties in cases where multiple backends have been assigned - /// the same number or bytes. It tracks the number of assigned bytes, which backends + /// initialized with a copy of the global executor information and assigns a random rank + /// to each executor to break ties in cases where multiple executors have been assigned + /// the same number or bytes. It tracks the number of assigned bytes, which executors /// have already been used, etc. Objects of this class are created in /// ComputeScanRangeAssignment() and thus don't need to be thread safe. class AssignmentCtx { public: - AssignmentCtx(const BackendConfig& backend_config, IntCounter* total_assignments, + AssignmentCtx(const BackendConfig& executor_config, IntCounter* total_assignments, IntCounter* total_local_assignments); /// Among hosts in 'data_locations', select the one with the minimum number of - /// assigned bytes. If backends have been assigned equal amounts of work and - /// 'break_ties_by_rank' is true, then the backend rank is used to break ties. - /// Otherwise the first backend according to their order in 'data_locations' is + /// assigned bytes. If executors have been assigned equal amounts of work and + /// 'break_ties_by_rank' is true, then the executor rank is used to break ties. + /// Otherwise the first executor according to their order in 'data_locations' is /// selected. - const IpAddr* SelectLocalBackendHost( + const IpAddr* SelectLocalExecutor( const std::vector<IpAddr>& data_locations, bool break_ties_by_rank); - /// Select a backend host for a remote read. If there are unused backend hosts, then + /// Select an executor for a remote read. If there are unused executor hosts, then /// those will be preferred. Otherwise the one with the lowest number of assigned - /// bytes is picked. If backends have been assigned equal amounts of work, then the - /// backend rank is used to break ties. - const IpAddr* SelectRemoteBackendHost(); + /// bytes is picked. If executors have been assigned equal amounts of work, then the + /// executor rank is used to break ties. + const IpAddr* SelectRemoteExecutor(); - /// Return the next backend that has not been assigned to. This assumes that a - /// returned backend will also be assigned to. The caller must make sure that - /// HasUnusedBackends() is true. - const IpAddr* GetNextUnusedBackendAndIncrement(); + /// Return the next executor that has not been assigned to. This assumes that a + /// returned executor will also be assigned to. The caller must make sure that + /// HasUnusedExecutors() is true. + const IpAddr* GetNextUnusedExecutorAndIncrement(); - /// Pick a backend in round-robin fashion from multiple backends on a single host. - void SelectBackendOnHost(const IpAddr& backend_ip, TBackendDescriptor* backend); + /// Pick an executor in round-robin fashion from multiple executors on a single host. + void SelectExecutorOnHost(const IpAddr& executor_ip, TBackendDescriptor* executor); /// Build a new TScanRangeParams object and append it to the assignment list for the - /// tuple (backend, node_id) in 'assignment'. Also, update assignment_heap_ and + /// tuple (executor, node_id) in 'assignment'. Also, update assignment_heap_ and /// assignment_byte_counters_, increase the counters 'total_assignments_' and /// 'total_local_assignments_'. 'scan_range_locations' contains information about the /// scan range and its replica locations. - void RecordScanRangeAssignment(const TBackendDescriptor& backend, PlanNodeId node_id, + void RecordScanRangeAssignment(const TBackendDescriptor& executor, PlanNodeId node_id, const vector<TNetworkAddress>& host_list, const TScanRangeLocationList& scan_range_locations, FragmentScanRangeAssignment* assignment); - const BackendConfig& backend_config() const { return backend_config_; } + const BackendConfig& executor_config() const { return executors_config_; } /// Print the assignment and statistics to VLOG_FILE. void PrintAssignment(const FragmentScanRangeAssignment& assignment); @@ -225,26 +225,26 @@ class Scheduler { int64_t cached_bytes = 0; }; - /// Used to look up hostnames to IP addresses and IP addresses to backend. - const BackendConfig& backend_config_; + /// Used to look up hostnames to IP addresses and IP addresses to executors. + const BackendConfig& executors_config_; - // Addressable heap to select remote backends from. Elements are ordered by the number - // of already assigned bytes (and a random rank to break ties). + // Addressable heap to select remote executors from. Elements are ordered by the + // number of already assigned bytes (and a random rank to break ties). AddressableAssignmentHeap assignment_heap_; - /// Store a random rank per backend host to break ties between otherwise equivalent + /// Store a random rank per executor host to break ties between otherwise equivalent /// replicas (e.g., those having the same number of assigned bytes). - boost::unordered_map<IpAddr, int> random_backend_rank_; + boost::unordered_map<IpAddr, int> random_executor_rank_; - // Index into random_backend_order. It points to the first unused backend and is used - // to select unused backends and inserting them into the assignment_heap_. - int first_unused_backend_idx_; + /// Index into random_executor_order. It points to the first unused executor and is + /// used to select unused executors and inserting them into the assignment_heap_. + int first_unused_executor_idx_; - /// Store a random permutation of backend hosts to select backends from. - std::vector<IpAddr> random_backend_order_; + /// Store a random permutation of executor hosts to select executors from. + std::vector<IpAddr> random_executor_order_; - /// Track round robin information per backend host. - NextBackendPerHost next_backend_per_host_; + /// Track round robin information per executor host. + NextExecutorPerHost next_executor_per_host_; /// Track number of assigned bytes that have been read from cache, locally, or /// remotely. @@ -254,39 +254,40 @@ class Scheduler { IntCounter* total_assignments_; IntCounter* total_local_assignments_; - /// Return whether there are backends that have not been assigned a scan range. - bool HasUnusedBackends() const; + /// Return whether there are executors that have not been assigned a scan range. + bool HasUnusedExecutors() const; - /// Return the rank of a backend. - int GetBackendRank(const IpAddr& ip) const; + /// Return the rank of an executor. + int GetExecutorRank(const IpAddr& ip) const; }; - /// The scheduler's backend configuration. When receiving changes to the backend + /// The scheduler's executors configuration. When receiving changes to the executors /// configuration from the statestore we will make a copy of the stored object, apply /// the updates to the copy and atomically swap the contents of this pointer. Each plan - /// node creates a read-only copy of the scheduler's current backend_config_ to use + /// node creates a read-only copy of the scheduler's current executors_config_ to use /// during scheduling. - BackendConfigPtr backend_config_; + ExecutorsConfigPtr executors_config_; /// A backend configuration which only contains the local backend. It is used when /// scheduling on the coordinator. BackendConfig coord_only_backend_config_; - /// Protect access to backend_config_ which might otherwise be updated asynchronously + /// Protect access to executors_config_ which might otherwise be updated asynchronously /// with respect to reads. - mutable boost::mutex backend_config_lock_; + mutable boost::mutex executors_config_lock_; - /// Total number of scan ranges assigned to backends during the lifetime of the + /// Total number of scan ranges assigned to executors during the lifetime of the /// scheduler. int64_t num_assignments_; - /// Map from unique backend id to TBackendDescriptor. Used to track the known backends - /// from the statestore. It's important to track both the backend ID as well as the - /// TBackendDescriptor so we know what is being removed in a given update. - /// Locking of this map is not needed since it should only be read/modified from - /// within the UpdateMembership() function. + /// Map from unique backend ID to TBackendDescriptor. The + /// {backend ID, TBackendDescriptor} pairs represent the IMPALA_MEMBERSHIP_TOPIC + /// {key, value} pairs of known executors retrieved from the statestore. It's important + /// to track both the backend ID as well as the TBackendDescriptor so we know what is + /// being removed in a given update. Locking of this map is not needed since it should + /// only be read/modified from within the UpdateMembership() function. typedef boost::unordered_map<std::string, TBackendDescriptor> BackendIdMap; - BackendIdMap current_membership_; + BackendIdMap current_executors_; /// MetricGroup subsystem access MetricGroup* metrics_; @@ -314,30 +315,22 @@ class Scheduler { /// Initialization metric BooleanProperty* initialized_; - /// Current number of backends + /// Current number of executors IntGauge* num_fragment_instances_metric_; /// Used for user-to-pool resolution and looking up pool configurations. Not owned by /// us. RequestPoolService* request_pool_service_; - /// Helper methods to access backend_config_ (the shared_ptr, not its contents), - /// protecting the access with backend_config_lock_. - BackendConfigPtr GetBackendConfig() const; - void SetBackendConfig(const BackendConfigPtr& backend_config); + /// Helper methods to access executors_config_ (the shared_ptr, not its contents), + /// protecting the access with executors_config_lock_. + ExecutorsConfigPtr GetExecutorsConfig() const; + void SetExecutorsConfig(const ExecutorsConfigPtr& executors_config); /// Called asynchronously when an update is received from the subscription manager void UpdateMembership(const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas, std::vector<TTopicDelta>* subscriber_topic_updates); - /// Webserver callback that produces a list of known backends. - /// Example output: - /// "backends": [ - /// "henry-metrics-pkg-cdh5.ent.cloudera.com:22000" - /// ], - void BackendsUrlCallback( - const Webserver::ArgumentMap& args, rapidjson::Document* document); - /// Determine the pool for a user and query options via request_pool_service_. Status GetRequestPool(const std::string& user, const TQueryOptions& query_options, std::string* pool) const; @@ -356,18 +349,19 @@ class Scheduler { /// Otherwise the assignment is computed for each scan range as follows: /// /// Scan ranges refer to data, which is usually replicated on multiple hosts. All scan - /// ranges where one of the replica hosts also runs an impala backend are processed - /// first. If more than one of the replicas run an impala backend, then the 'memory - /// distance' of each backend is considered. The concept of memory distance reflects the - /// cost of moving data into the processing backend's main memory. Reading from cached - /// replicas is generally considered less costly than reading from a local disk, which - /// in turn is cheaper than reading data from a remote node. If multiple backends of the - /// same memory distance are found, then the one with the least amount of previously - /// assigned work is picked, thus aiming to distribute the work as evenly as possible. + /// ranges where one of the replica hosts also runs an impala executor are processed + /// first. If more than one of the replicas run an impala executor, then the 'memory + /// distance' of each executor is considered. The concept of memory distance reflects + /// the cost of moving data into the processing executor's main memory. Reading from + /// cached replicas is generally considered less costly than reading from a local disk, + /// which in turn is cheaper than reading data from a remote node. If multiple executors + /// of the same memory distance are found, then the one with the least amount of + /// previously assigned work is picked, thus aiming to distribute the work as evenly as + /// possible. /// - /// Finally, scan ranges are considered which do not have an impalad backend running on + /// Finally, scan ranges are considered which do not have an impalad executor running on /// any of their data nodes. They will be load-balanced by assigned bytes across all - /// backends + /// executors. /// /// The resulting assignment is influenced by the following query options: /// @@ -384,9 +378,9 @@ class Scheduler { /// false. /// /// schedule_random_replica: - /// When equivalent backends with a memory distance of DISK_LOCAL are found for a scan - /// range (same memory distance, same amount of assigned work), then the first one - /// will be picked deterministically. This aims to make better use of OS buffer + /// When equivalent executors with a memory distance of DISK_LOCAL are found for a + /// scan range (same memory distance, same amount of assigned work), then the first + /// one will be picked deterministically. This aims to make better use of OS buffer /// caches, but can lead to performance bottlenecks on individual hosts. Setting this /// option to true will randomly change the order in which equivalent replicas are /// picked for different plan nodes. This helps to compute a more even assignment, @@ -396,17 +390,17 @@ class Scheduler { /// /// The method takes the following parameters: /// - /// backend_config: Backend configuration to use for scheduling. + /// executor_config: Executor configuration to use for scheduling. /// node_id: ID of the plan node. /// node_replica_preference: Query hint equivalent to replica_preference. /// node_random_replica: Query hint equivalent to schedule_random_replica. - /// locations: List of scan ranges to be assigned to backends. + /// locations: List of scan ranges to be assigned to executors. /// host_list: List of hosts, into which 'locations' will index. /// exec_at_coord: Whether to schedule all scan ranges on the coordinator. /// query_options: Query options for the current query. /// timer: Tracks execution time of ComputeScanRangeAssignment. /// assignment: Output parameter, to which new assignments will be added. - Status ComputeScanRangeAssignment(const BackendConfig& backend_config, + Status ComputeScanRangeAssignment(const BackendConfig& executor_config, PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference, bool node_random_replica, const std::vector<TScanRangeLocationList>& locations, const std::vector<TNetworkAddress>& host_list, bool exec_at_coord, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/service/impala-http-handler.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index ff3c7bc..b9f3382 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -76,6 +76,9 @@ static Status ParseIdFromArguments(const Webserver::ArgumentMap& args, TUniqueId void ImpalaHttpHandler::RegisterHandlers(Webserver* webserver) { DCHECK(webserver != NULL); + webserver->RegisterUrlCallback("/backends", "backends.tmpl", + MakeCallback(this, &ImpalaHttpHandler::BackendsHandler)); + webserver->RegisterUrlCallback("/hadoop-varz", "hadoop-varz.tmpl", MakeCallback(this, &ImpalaHttpHandler::HadoopVarzHandler)); @@ -777,3 +780,19 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include Value json_id(PrintId(query_id).c_str(), document->GetAllocator()); document->AddMember("query_id", json_id, document->GetAllocator()); } + +void ImpalaHttpHandler::BackendsHandler(const Webserver::ArgumentMap& args, + Document* document) { + Value backends_list(kArrayType); + for (const auto& entry : server_->GetKnownBackends()) { + TBackendDescriptor backend = entry.second; + Value backend_obj(kObjectType); + Value str(TNetworkAddressToString(backend.address).c_str(), document->GetAllocator()); + backend_obj.AddMember("address", str, document->GetAllocator()); + backend_obj.AddMember("is_coordinator", backend.is_coordinator, + document->GetAllocator()); + backend_obj.AddMember("is_executor", backend.is_executor, document->GetAllocator()); + backends_list.PushBack(backend_obj, document->GetAllocator()); + } + document->AddMember("backends", backends_list, document->GetAllocator()); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/service/impala-http-handler.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-http-handler.h b/be/src/service/impala-http-handler.h index 7be4729..485f6db 100644 --- a/be/src/service/impala-http-handler.h +++ b/be/src/service/impala-http-handler.h @@ -144,6 +144,16 @@ class ImpalaHttpHandler { /// QueryStateHandler(). void QueryStateToJson(const ImpalaServer::QueryStateRecord& record, rapidjson::Value* value, rapidjson::Document* document); + + /// Json callback for /backends, which prints a table of known backends. + /// "backends" : [ + /// { + /// "address": "localhost:21000", + /// "is_coordinator": true, + /// "is_executor": false + /// } + /// ] + void BackendsHandler(const Webserver::ArgumentMap& args, rapidjson::Document* document); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 80c1507..86cb0c9 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -86,6 +86,7 @@ using boost::adopt_lock_t; using boost::algorithm::is_any_of; using boost::algorithm::istarts_with; +using boost::algorithm::join; using boost::algorithm::replace_all_copy; using boost::algorithm::split; using boost::algorithm::token_compress_on; @@ -96,6 +97,7 @@ using boost::uuids::uuid; using namespace apache::thrift; using namespace boost::posix_time; using namespace beeswax; +using namespace rapidjson; using namespace strings; DECLARE_int32(be_port); @@ -178,8 +180,9 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be i " the maximum allowable timeout."); DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and coordinate " - "queries from clients. If false, this daemon will only execute query fragments, and " - "will refuse client connections."); + "queries from clients. If false, it will refuse client connections."); +DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query " + "fragments."); // TODO: Remove for Impala 3.0. DEFINE_string(local_nodemanager_url, "", "Deprecated"); @@ -370,6 +373,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env) bind<void>(&ImpalaServer::ExpireQueries, this))); is_coordinator_ = FLAGS_is_coordinator; + is_executor_ = FLAGS_is_executor; exec_env_->SetImpalaServer(this); } @@ -405,6 +409,11 @@ Status ImpalaServer::LogLineageRecord(const QueryExecState& query_exec_state) { } bool ImpalaServer::IsCoordinator() { return is_coordinator_; } +bool ImpalaServer::IsExecutor() { return is_executor_; } + +const ImpalaServer::BackendDescriptorMap& ImpalaServer::GetKnownBackends() { + return known_backends_; +} bool ImpalaServer::IsLineageLoggingEnabled() { return !FLAGS_lineage_event_log_dir.empty(); @@ -1585,6 +1594,8 @@ void ImpalaServer::AddLocalBackendToStatestore( if (known_backends_.find(local_backend_id) != known_backends_.end()) return; TBackendDescriptor local_backend_descriptor; + local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator); + local_backend_descriptor.__set_is_executor(FLAGS_is_executor); local_backend_descriptor.__set_address( MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)); IpAddr ip; @@ -1889,6 +1900,9 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int DCHECK((beeswax_port == 0) == (beeswax_server == nullptr)); DCHECK((hs2_port == 0) == (hs2_server == nullptr)); DCHECK((be_port == 0) == (be_server == nullptr)); + if (!FLAGS_is_coordinator && !FLAGS_is_executor) { + return Status("Impala server needs to have a role (EXECUTOR, COORDINATOR)"); + } impala_server->reset(new ImpalaServer(exec_env)); @@ -1910,8 +1924,10 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int LOG(INFO) << "ImpalaInternalService listening on " << be_port; } + if (!FLAGS_is_coordinator) { - LOG(INFO) << "Started worker Impala server on " + + LOG(INFO) << "Started executor Impala server on " << ExecEnv::GetInstance()->backend_address(); return Status::OK(); } @@ -1961,7 +1977,7 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_port; } - LOG(INFO) << "Started coordinator Impala server on " + LOG(INFO) << "Started coordinator/executor Impala server on " << ExecEnv::GetInstance()->backend_address(); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/service/impala-server.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 45e8080..34e204d 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -273,6 +273,12 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// Retuns true if this is a coordinator, false otherwise. bool IsCoordinator(); + /// Returns true if this is an executor, false otherwise. + bool IsExecutor(); + + typedef boost::unordered_map<std::string, TBackendDescriptor> BackendDescriptorMap; + const BackendDescriptorMap& GetKnownBackends(); + /// The prefix of audit event log filename. static const string AUDIT_EVENT_LOG_FILE_PREFIX; @@ -853,16 +859,17 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, QueryLocations; QueryLocations query_locations_; - /// A map from unique backend ID to the corresponding TBackendDescriptor of that backend. - /// Used to track membership updates from the statestore so queries can be cancelled - /// when a backend is removed. It's not enough to just cancel fragments that are running - /// based on the deletions mentioned in the most recent statestore heartbeat; sometimes - /// cancellations are skipped and the statestore, at its discretion, may send only - /// a delta of the current membership so we need to compute any deletions. + /// A map from unique backend ID to the corresponding TBackendDescriptor of that + /// backend. Used to track membership updates from the statestore so queries can be + /// cancelled when a backend is removed. It's not enough to just cancel fragments that + /// are running based on the deletions mentioned in the most recent statestore + /// heartbeat; sometimes cancellations are skipped and the statestore, at its + /// discretion, may send only a delta of the current membership so we need to compute + /// any deletions. /// TODO: Currently there are multiple locations where cluster membership is tracked, - /// here and in the scheduler. This should be consolidated so there is a single component - /// (the scheduler?) that tracks this information and calls other interested components. - typedef boost::unordered_map<std::string, TBackendDescriptor> BackendDescriptorMap; + /// here and in the scheduler. This should be consolidated so there is a single + /// component (the scheduler?) that tracks this information and calls other interested + /// components. BackendDescriptorMap known_backends_; /// Generate unique session id for HiveServer2 session @@ -956,6 +963,9 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// True if this ImpalaServer can accept client connections and coordinate /// queries. bool is_coordinator_; + + /// True if this ImpalaServer can execute query fragments. + bool is_executor_; }; /// Create an ImpalaServer and Thrift servers. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/util/network-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc index 0a722cc..4382e99 100644 --- a/be/src/util/network-util.cc +++ b/be/src/util/network-util.cc @@ -154,6 +154,8 @@ TBackendDescriptor MakeBackendDescriptor(const Hostname& hostname, const IpAddr& TBackendDescriptor be_desc; be_desc.address = MakeNetworkAddress(hostname, port); be_desc.ip_address = ip; + be_desc.is_coordinator = true; + be_desc.is_executor = true; return be_desc; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/be/src/util/webserver.cc ---------------------------------------------------------------------- diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc index 3b7c4f9..3e587a2 100644 --- a/be/src/util/webserver.cc +++ b/be/src/util/webserver.cc @@ -56,6 +56,7 @@ typedef sig_t sighandler_t; #endif using boost::algorithm::is_any_of; +using boost::algorithm::join; using boost::algorithm::split; using boost::algorithm::trim_right; using boost::algorithm::to_lower; @@ -193,10 +194,11 @@ void Webserver::RootHandler(const ArgumentMap& args, Document* document) { ExecEnv* env = ExecEnv::GetInstance(); if (env == nullptr || env->impala_server() == nullptr) return; - string mode = (env->impala_server()->IsCoordinator()) ? - "Coordinator + Executor" : "Executor"; - Value impala_server_mode(mode.c_str(), document->GetAllocator()); - document->AddMember("impala_server_mode", impala_server_mode, document->GetAllocator()); + document->AddMember("impala_server_mode", true, document->GetAllocator()); + document->AddMember("is_coordinator", env->impala_server()->IsCoordinator(), + document->GetAllocator()); + document->AddMember("is_executor", env->impala_server()->IsExecutor(), + document->GetAllocator()); } void Webserver::ErrorHandler(const ArgumentMap& args, Document* document) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/bin/start-impala-cluster.py ---------------------------------------------------------------------- diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py index 37671e3..2b42959 100755 --- a/bin/start-impala-cluster.py +++ b/bin/start-impala-cluster.py @@ -38,6 +38,10 @@ parser.add_option("-s", "--cluster_size", type="int", dest="cluster_size", defau help="Size of the cluster (number of impalad instances to start).") parser.add_option("-c", "--num_coordinators", type="int", dest="num_coordinators", default=3, help="Number of coordinators.") +parser.add_option("--use_exclusive_coordinators", dest="use_exclusive_coordinators", + action="store_true", default=False, help="If true, coordinators only " + "coordinate queries and execute coordinator fragments. If false, " + "coordinators also act as executors.") parser.add_option("--build_type", dest="build_type", default= 'latest', help="Build type to use - debug / release / latest") parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string", @@ -50,7 +54,7 @@ parser.add_option("--catalogd_args", dest="catalogd_args", action="append", type="string", default=[], help="Additional arguments to pass to the Catalog Service at startup") parser.add_option("--kill", "--kill_only", dest="kill_only", action="store_true", - default=False, help="Instead of starting the cluster, just kill all"\ + default=False, help="Instead of starting the cluster, just kill all" " the running impalads and the statestored.") parser.add_option("--force_kill", dest="force_kill", action="store_true", default=False, help="Force kill impalad and statestore processes.") @@ -67,7 +71,7 @@ parser.add_option('--max_log_files', default=DEFAULT_IMPALA_MAX_LOG_FILES, parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False, help="Prints all output to stderr/stdout.") parser.add_option("--wait_for_cluster", dest="wait_for_cluster", action="store_true", - default=False, help="Wait until the cluster is ready to accept "\ + default=False, help="Wait until the cluster is ready to accept " "queries before returning.") parser.add_option("--log_level", type="int", dest="log_level", default=1, help="Set the impalad backend logging level") @@ -206,7 +210,10 @@ def build_jvm_args(instance_num): BASE_JVM_DEBUG_PORT = 30000 return JVM_ARGS % (BASE_JVM_DEBUG_PORT + instance_num, options.jvm_args) -def start_impalad_instances(cluster_size, num_coordinators): +def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordinators): + """Start 'cluster_size' impalad instances. The first 'num_coordinator' instances will + act as coordinators. 'use_exclusive_coordinators' specifies whether the coordinators + will only execute coordinator fragments.""" if cluster_size == 0: # No impalad instances should be started. return @@ -244,6 +251,9 @@ def start_impalad_instances(cluster_size, num_coordinators): if i >= num_coordinators: args = "-is_coordinator=false %s" % (args) + elif use_exclusive_coordinators: + # Coordinator instance that doesn't execute non-coordinator fragments + args = "-is_executor=false %s" % (args) stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name) exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path) @@ -285,9 +295,9 @@ def wait_for_cluster_web(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS): # impalad processes may take a while to come up. wait_for_impala_process_count(impala_cluster) for impalad in impala_cluster.impalads: + impalad.service.wait_for_num_known_live_backends(options.cluster_size, + timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2) if impalad._get_arg_value('is_coordinator', default='true') == 'true': - impalad.service.wait_for_num_known_live_backends(options.cluster_size, - timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2) wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS) def wait_for_catalog(impalad, timeout_in_seconds): @@ -337,6 +347,10 @@ if __name__ == "__main__": print 'Please specify a valid number of coordinators > 0' sys.exit(1) + if options.use_exclusive_coordinators and options.num_coordinators >= options.cluster_size: + print 'Cannot start an Impala cluster with no executors' + sys.exit(1) + if not os.path.isdir(options.log_dir): print 'Log dir does not exist or is not a directory: %s' % options.log_dir sys.exit(1) @@ -383,7 +397,8 @@ if __name__ == "__main__": if not options.restart_impalad_only: start_statestore() start_catalogd() - start_impalad_instances(options.cluster_size, options.num_coordinators) + start_impalad_instances(options.cluster_size, options.num_coordinators, + options.use_exclusive_coordinators) # Sleep briefly to reduce log spam: the cluster takes some time to start up. sleep(3) wait_for_cluster() http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/common/thrift/StatestoreService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift index 1677635..90400b2 100644 --- a/common/thrift/StatestoreService.thrift +++ b/common/thrift/StatestoreService.thrift @@ -47,7 +47,8 @@ struct TPoolStats { // Structure serialised in the Impala backend topic. Each Impalad // constructs one TBackendDescriptor, and registers it in the backend // topic. Impalads subscribe to this topic to learn of the location of -// all other Impalads in the cluster. +// all other Impalads in the cluster. Impalads can act as coordinators, executors or +// both. struct TBackendDescriptor { // Network address of the Impala service on this backend 1: required Types.TNetworkAddress address; @@ -56,11 +57,17 @@ struct TBackendDescriptor { // cost of resolution at every Impalad (since IP addresses are needed for scheduling) 2: required string ip_address; + // True if this is a coordinator node + 3: required bool is_coordinator; + + // True if this is an executor node + 4: required bool is_executor; + // The address of the debug HTTP server - 3: optional Types.TNetworkAddress debug_http_address; + 5: optional Types.TNetworkAddress debug_http_address; // True if the debug webserver is secured (for correctly generating links) - 4: optional bool secure_webserver; + 6: optional bool secure_webserver; } // Description of a single entry in a topic http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/tests/common/custom_cluster_test_suite.py ---------------------------------------------------------------------- diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index b19b5d2..377b38e 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -108,13 +108,15 @@ class CustomClusterTestSuite(ImpalaTestSuite): @classmethod def _start_impala_cluster(cls, options, log_dir=os.getenv('LOG_DIR', "/tmp/"), - cluster_size=CLUSTER_SIZE, num_coordinators=NUM_COORDINATORS, log_level=1): + cluster_size=CLUSTER_SIZE, num_coordinators=NUM_COORDINATORS, + use_exclusive_coordinators=False, log_level=1): cls.impala_log_dir = log_dir cmd = [os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'), '--cluster_size=%d' % cluster_size, '--num_coordinators=%d' % num_coordinators, '--log_dir=%s' % log_dir, '--log_level=%s' % log_level] + if use_exclusive_coordinators: cmd.append("--use_exclusive_coordinators") try: check_call(cmd + options, close_fds=True) finally: @@ -125,8 +127,7 @@ class CustomClusterTestSuite(ImpalaTestSuite): raise Exception("statestored was not found") statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60) for impalad in cls.cluster.impalads: - if impalad._get_arg_value('is_coordinator', default='true') == 'true': - impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE, timeout=60) + impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE, timeout=60) def assert_impalad_log_contains(self, level, line_regex, expected_count=1): """ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/tests/custom_cluster/test_coordinators.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py index 6010404..cac4512 100644 --- a/tests/custom_cluster/test_coordinators.py +++ b/tests/custom_cluster/test_coordinators.py @@ -84,3 +84,37 @@ class TestCoordinators(CustomClusterTestSuite): assert num_tbls == 0 client1.close() client2.close() + + @pytest.mark.execute_serially + def test_single_coordinator_cluster_config(self): + """Test a cluster configuration with a single coordinator.""" + + def exec_and_verify_num_executors(expected_num_of_executors): + """Connects to the coordinator node, runs a query and verifies that certain + operators are executed on 'expected_num_of_executors' nodes.""" + coordinator = self.cluster.impalads[0] + try: + client = coordinator.service.create_beeswax_client() + assert client is not None + query = "select count(*) from functional.alltypesagg" + result = self.execute_query_expect_success(client, query) + # Verify that SCAN and AGG are executed on the expected number of + # executor nodes + for rows in result.exec_summary: + if rows['operator'] == 'OO:SCAN HDFS': + assert rows['num_hosts'] == expected_num_of_executors + elif rows['operator'] == '01:AGGREGATE': + assert rows['num_hosts'] == expected_num_of_executors + finally: + client.close() + + # Cluster config where the coordinator can execute query fragments + self._start_impala_cluster([], cluster_size=3, num_coordinators=1, + use_exclusive_coordinators=False) + exec_and_verify_num_executors(3) + # Stop the cluster + self._stop_impala_cluster() + # Cluster config where the oordinator can only execute coordinator fragments + self._start_impala_cluster([], cluster_size=3, num_coordinators=1, + use_exclusive_coordinators=True) + exec_and_verify_num_executors(2) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/www/backends.tmpl ---------------------------------------------------------------------- diff --git a/www/backends.tmpl b/www/backends.tmpl index cef77b7..20cfa11 100644 --- a/www/backends.tmpl +++ b/www/backends.tmpl @@ -23,13 +23,17 @@ under the License. <table id="backends" class='table table-hover table-bordered'> <thead> <tr> - <th>Backend</th> + <th>Address</th> + <th>Coordinator</th> + <th>Executor</th> </tr> </thead> <tbody> {{#backends}} <tr> - <td>{{.}}</td> + <td>{{address}}</td> + <td>{{is_coordinator}}</td> + <td>{{is_executor}}</td> </tr> {{/backends}} </tbody> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e2c53a8b/www/root.tmpl ---------------------------------------------------------------------- diff --git a/www/root.tmpl b/www/root.tmpl index 40d448a..d9c6d2a 100644 --- a/www/root.tmpl +++ b/www/root.tmpl @@ -19,7 +19,8 @@ under the License. {{! Template for / }} {{>www/common-header.tmpl}} {{?impala_server_mode}} - <h2>Impala Server Mode: {{impala_server_mode}}</h2> + <h2>Impala Server Mode: {{?is_coordinator}}Coordinator{{/is_coordinator}} + {{?is_executor}}Executor{{/is_executor}}</h2> {{/impala_server_mode}} <h2>Vers<span id="v">i</span>on</h2>
