IMPALA-4041: Limit catalog and admission control updates to coordinators With this commit we add the ability to limit catalog updates to a limited set of coordinator nodes. A new startup option, termed 'is_coordinator' is added to indicate if a node is a coordinator. Coordinators accept connections through HS2 and Beeswax interfaces and can also participate in query execution. Non-coordinator nodes do not receive catalog updates from the statestore, do not initialize a query scheduler and cannot accept Beeswax and HS2 client connections.
Testing: - Added a custom cluster test that launches a cluster in which the number of coordinators is less than the cluster size and runs a number of smoke queries. - Successfully run exhaustive tests. Change-Id: I5f2c74abdbcd60ac050efa323616bd41182ceff3 Reviewed-on: http://gerrit.cloudera.org:8080/6344 Reviewed-by: Dimitris Tsirogiannis <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/296df3c8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/296df3c8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/296df3c8 Branch: refs/heads/master Commit: 296df3c8269b7be16120397d60e97b6f24fc390b Parents: 4aa8e2d Author: Dimitris Tsirogiannis <[email protected]> Authored: Thu Mar 9 15:24:51 2017 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Mar 28 22:27:25 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/exec-env.cc | 53 +++++--- be/src/runtime/exec-env.h | 7 +- be/src/scheduling/admission-controller.cc | 26 ++-- be/src/scheduling/admission-controller.h | 11 +- be/src/scheduling/scheduler-test-util.cc | 16 +-- be/src/scheduling/scheduler-test.cc | 9 +- be/src/scheduling/scheduler.cc | 89 +++---------- be/src/scheduling/scheduler.h | 8 +- be/src/service/impala-server.cc | 173 +++++++++++++++++-------- be/src/service/impala-server.h | 18 ++- be/src/service/impalad-main.cc | 24 ++-- be/src/service/query-exec-state.cc | 19 ++- be/src/testutil/in-process-servers.h | 9 +- be/src/util/webserver.cc | 33 +++-- bin/start-impala-cluster.py | 24 +++- tests/common/custom_cluster_test_suite.py | 7 +- tests/common/impala_service.py | 14 ++ tests/custom_cluster/test_coordinators.py | 86 ++++++++++++ www/root.tmpl | 4 + 19 files changed, 415 insertions(+), 215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 611520c..dd27a91 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -40,6 +40,7 @@ #include "runtime/query-exec-mgr.h" #include "runtime/thread-resource-mgr.h" #include "runtime/tmp-file-mgr.h" +#include "scheduling/admission-controller.h" #include "scheduling/request-pool-service.h" #include "scheduling/scheduler.h" #include "service/frontend.h" @@ -79,12 +80,14 @@ DEFINE_int32(state_store_subscriber_port, 23000, "port where StatestoreSubscriberService should be exported"); DEFINE_int32(num_hdfs_worker_threads, 16, "(Advanced) The number of threads in the global HDFS operation pool"); +DEFINE_bool(disable_admission_control, false, "Disables admission control."); DECLARE_int32(state_store_port); DECLARE_int32(num_threads_per_core); DECLARE_int32(num_cores); DECLARE_int32(be_port); DECLARE_string(mem_limit); +DECLARE_bool(is_coordinator); // TODO: Remove the following RM-related flags in Impala 3.0. DEFINE_bool(enable_rm, false, "Deprecated"); @@ -123,7 +126,7 @@ const static string DEFAULT_FS = "fs.defaultFS"; namespace impala { -ExecEnv* ExecEnv::exec_env_ = NULL; +ExecEnv* ExecEnv::exec_env_ = nullptr; ExecEnv::ExecEnv() : metrics_(new MetricGroup("impala-metrics")), @@ -139,7 +142,7 @@ ExecEnv::ExecEnv() htable_factory_(new HBaseTableFactory()), disk_io_mgr_(new DiskIoMgr()), webserver_(new Webserver()), - mem_tracker_(NULL), + mem_tracker_(nullptr), pool_mem_trackers_(new PoolMemTrackerRegistry), thread_mgr_(new ThreadResourceMgr), hdfs_op_thread_pool_( @@ -168,10 +171,19 @@ ExecEnv::ExecEnv() Substitute("impalad@$0", TNetworkAddressToString(backend_address_)), subscriber_address, statestore_address, metrics_.get())); - scheduler_.reset(new Scheduler(statestore_subscriber_.get(), - statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(), - request_pool_service_.get())); - } else { + if (FLAGS_is_coordinator) { + scheduler_.reset(new Scheduler(statestore_subscriber_.get(), + statestore_subscriber_->id(), backend_address_, metrics_.get(), + webserver_.get(), request_pool_service_.get())); + } + + if (!FLAGS_disable_admission_control) { + admission_controller_.reset(new AdmissionController(statestore_subscriber_.get(), + request_pool_service_.get(), metrics_.get(), backend_address_)); + } else { + LOG(INFO) << "Admission control is disabled."; + } + } else if (FLAGS_is_coordinator) { vector<TNetworkAddress> addresses; addresses.push_back(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)); scheduler_.reset(new Scheduler( @@ -196,7 +208,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, htable_factory_(new HBaseTableFactory()), disk_io_mgr_(new DiskIoMgr()), webserver_(new Webserver(webserver_port)), - mem_tracker_(NULL), + mem_tracker_(nullptr), pool_mem_trackers_(new PoolMemTrackerRegistry), thread_mgr_(new ThreadResourceMgr), hdfs_op_thread_pool_( @@ -208,7 +220,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)), query_exec_mgr_(new QueryExecMgr()), buffer_reservation_(nullptr), - buffer_pool_(NULL), + buffer_pool_(nullptr), enable_webserver_(FLAGS_enable_webserver && webserver_port > 0), is_fe_tests_(false), backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) { @@ -224,10 +236,18 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, Substitute("impalad@$0", TNetworkAddressToString(backend_address_)), subscriber_address, statestore_address, metrics_.get())); - scheduler_.reset(new Scheduler(statestore_subscriber_.get(), - statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(), - request_pool_service_.get())); - } else { + if (FLAGS_is_coordinator) { + scheduler_.reset(new Scheduler(statestore_subscriber_.get(), + statestore_subscriber_->id(), backend_address_, metrics_.get(), + webserver_.get(), request_pool_service_.get())); + } + + if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled."; + if (!FLAGS_disable_admission_control) { + admission_controller_.reset(new AdmissionController(statestore_subscriber_.get(), + request_pool_service_.get(), metrics_.get(), backend_address_)); + } + } else if (FLAGS_is_coordinator) { vector<TNetworkAddress> addresses; addresses.push_back(MakeNetworkAddress(hostname, backend_port)); scheduler_.reset(new Scheduler( @@ -284,7 +304,7 @@ Status ExecEnv::StartServices() { return Status("Failed to parse mem limit from '" + FLAGS_mem_limit + "'."); } - metrics_->Init(enable_webserver_ ? webserver_.get() : NULL); + metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr); impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends"); catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server"); RETURN_IF_ERROR(RegisterMemoryMetrics(metrics_.get(), true)); @@ -326,7 +346,8 @@ Status ExecEnv::StartServices() { LOG(INFO) << "Not starting webserver"; } - if (scheduler_ != NULL) RETURN_IF_ERROR(scheduler_->Init()); + if (scheduler_ != nullptr) RETURN_IF_ERROR(scheduler_->Init()); + if (admission_controller_ != nullptr) RETURN_IF_ERROR(admission_controller_->Init()); // Get the fs.defaultFS value set in core-site.xml and assign it to // configured_defaultFs @@ -340,7 +361,7 @@ Status ExecEnv::StartServices() { default_fs_ = "hdfs://"; } // Must happen after all topic registrations / callbacks are done - if (statestore_subscriber_.get() != NULL) { + if (statestore_subscriber_.get() != nullptr) { Status status = statestore_subscriber_->Start(); if (!status.ok()) { status.AddDetail("Statestore subscriber did not start up."); @@ -355,6 +376,6 @@ void ExecEnv::InitBufferPool(int64_t min_page_size, int64_t capacity) { DCHECK(buffer_pool_ == nullptr); buffer_pool_.reset(new BufferPool(min_page_size, capacity)); buffer_reservation_.reset(new ReservationTracker()); - buffer_reservation_->InitRootTracker(NULL, capacity); + buffer_reservation_->InitRootTracker(nullptr, capacity); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/runtime/exec-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index a5777ef..87824ba 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -28,6 +28,7 @@ namespace impala { +class AdmissionController; class BufferPool; class CallableThreadPool; class DataStreamMgr; @@ -72,10 +73,6 @@ class ExecEnv { void SetImpalaServer(ImpalaServer* server) { impala_server_ = server; } - StatestoreSubscriber* statestore_subscriber() { - return statestore_subscriber_.get(); - } - DataStreamMgr* stream_mgr() { return stream_mgr_.get(); } ImpalaBackendClientCache* impalad_client_cache() { return impalad_client_cache_.get(); @@ -106,6 +103,7 @@ class ExecEnv { void set_enable_webserver(bool enable) { enable_webserver_ = enable; } Scheduler* scheduler() { return scheduler_.get(); } + AdmissionController* admission_controller() { return admission_controller_.get(); } StatestoreSubscriber* subscriber() { return statestore_subscriber_.get(); } const TNetworkAddress& backend_address() const { return backend_address_; } @@ -129,6 +127,7 @@ class ExecEnv { boost::scoped_ptr<MetricGroup> metrics_; boost::scoped_ptr<DataStreamMgr> stream_mgr_; boost::scoped_ptr<Scheduler> scheduler_; + boost::scoped_ptr<AdmissionController> admission_controller_; boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_; boost::scoped_ptr<ImpalaBackendClientCache> impalad_client_cache_; boost::scoped_ptr<CatalogServiceClientCache> catalogd_client_cache_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/admission-controller.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index 3c9947a..efec78a 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -199,9 +199,11 @@ string AdmissionController::PoolStats::DebugString() const { // TODO: do we need host_id_ to come from host_addr or can it just take the same id // the Scheduler has (coming from the StatestoreSubscriber)? -AdmissionController::AdmissionController(RequestPoolService* request_pool_service, - MetricGroup* metrics, const TNetworkAddress& host_addr) - : request_pool_service_(request_pool_service), +AdmissionController::AdmissionController(StatestoreSubscriber* subscriber, + RequestPoolService* request_pool_service, MetricGroup* metrics, + const TNetworkAddress& host_addr) + : subscriber_(subscriber), + request_pool_service_(request_pool_service), metrics_group_(metrics), host_id_(TNetworkAddressToString(host_addr)), thrift_serializer_(false), @@ -224,10 +226,10 @@ AdmissionController::~AdmissionController() { dequeue_thread_->Join(); } -Status AdmissionController::Init(StatestoreSubscriber* subscriber) { +Status AdmissionController::Init() { StatestoreSubscriber::UpdateCallback cb = bind<void>(mem_fn(&AdmissionController::UpdatePoolStats), this, _1, _2); - Status status = subscriber->AddTopic(IMPALA_REQUEST_QUEUE_TOPIC, true, cb); + Status status = subscriber_->AddTopic(IMPALA_REQUEST_QUEUE_TOPIC, true, cb); if (!status.ok()) { status.AddDetail("AdmissionController failed to register request queue topic"); } @@ -578,12 +580,12 @@ void AdmissionController::PoolStats::UpdateRemoteStats(const string& host_id, if (VLOG_ROW_IS_ON) { stringstream ss; ss << "Stats update for pool=" << name_ << " backend=" << host_id; - if (host_stats == NULL) ss << " topic deletion"; + if (host_stats == nullptr) ss << " topic deletion"; if (it != remote_stats_.end()) ss << " previous: " << DebugPoolStats(it->second); - if (host_stats != NULL) ss << " new: " << DebugPoolStats(*host_stats); + if (host_stats != nullptr) ss << " new: " << DebugPoolStats(*host_stats); VLOG_ROW << ss.str(); } - if (host_stats == NULL) { + if (host_stats == nullptr) { if (it != remote_stats_.end()) { remote_stats_.erase(it); } else { @@ -620,7 +622,7 @@ void AdmissionController::HandleTopicDeletions(const vector<string>& topic_delet string topic_backend_id; if (!ParsePoolTopicKey(topic_key, &pool_name, &topic_backend_id)) continue; if (topic_backend_id == host_id_) continue; - GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, NULL); + GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr); } } @@ -705,7 +707,7 @@ void AdmissionController::PoolStats::UpdateMemTrackerStats() { ExecEnv::GetInstance()->pool_mem_trackers()->GetRequestPoolMemTracker(name_, false); const int64_t current_reserved = - tracker == NULL ? static_cast<int64_t>(0) : tracker->GetPoolMemReserved(); + tracker == nullptr ? static_cast<int64_t>(0) : tracker->GetPoolMemReserved(); if (current_reserved != local_stats_.backend_mem_reserved) { parent_->pools_for_updates_.insert(name_); local_stats_.backend_mem_reserved = current_reserved; @@ -713,7 +715,7 @@ void AdmissionController::PoolStats::UpdateMemTrackerStats() { } const int64_t current_usage = - tracker == NULL ? static_cast<int64_t>(0) : tracker->consumption(); + tracker == nullptr ? static_cast<int64_t>(0) : tracker->consumption(); metrics_.local_backend_mem_usage->set_value(current_usage); } @@ -797,7 +799,7 @@ void AdmissionController::DequeueLoop() { while (max_to_dequeue > 0 && !queue.empty()) { QueueNode* queue_node = queue.head(); - DCHECK(queue_node != NULL); + DCHECK(queue_node != nullptr); DCHECK(!queue_node->is_admitted.IsSet()); const QuerySchedule& schedule = queue_node->schedule; string not_admitted_reason; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/admission-controller.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index f0a37d8..d3ec49d 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -178,7 +178,8 @@ class ExecEnv; /// better idea of what is perhaps unnecessary. class AdmissionController { public: - AdmissionController(RequestPoolService* request_pool_service, MetricGroup* metrics, + AdmissionController(StatestoreSubscriber* subscriber, + RequestPoolService* request_pool_service, MetricGroup* metrics, const TNetworkAddress& host_addr); ~AdmissionController(); @@ -196,8 +197,8 @@ class AdmissionController { /// This does not block. Status ReleaseQuery(QuerySchedule* schedule); - /// Registers with the subscription manager. - Status Init(StatestoreSubscriber* subscriber); + /// Registers the request queue topic with the statestore. + Status Init(); private: class PoolStats; @@ -206,6 +207,10 @@ class AdmissionController { /// Statestore topic name. static const std::string IMPALA_REQUEST_QUEUE_TOPIC; + /// Subscription manager used to handle admission control updates. This is not + /// owned by this class. + StatestoreSubscriber* subscriber_; + /// Used for user-to-pool resolution and looking up pool configurations. Not owned by /// the AdmissionController. RequestPoolService* request_pool_service_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/scheduler-test-util.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc index 2474f33..782379c 100644 --- a/be/src/scheduling/scheduler-test-util.cc +++ b/be/src/scheduling/scheduler-test-util.cc @@ -450,13 +450,13 @@ SchedulerWrapper::SchedulerWrapper(const Plan& plan) } Status SchedulerWrapper::Compute(bool exec_at_coord, Result* result) { - DCHECK(scheduler_ != NULL); + DCHECK(scheduler_ != nullptr); // Compute Assignment. FragmentScanRangeAssignment* assignment = result->AddAssignment(); - return scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0, NULL, - false, plan_.scan_range_locations(), plan_.referenced_datanodes(), exec_at_coord, - plan_.query_options(), NULL, assignment); + return scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0, + nullptr, false, plan_.scan_range_locations(), plan_.referenced_datanodes(), + exec_at_coord, plan_.query_options(), nullptr, assignment); } void SchedulerWrapper::AddBackend(const Host& host) { @@ -495,7 +495,7 @@ void SchedulerWrapper::SendEmptyUpdate() { } void SchedulerWrapper::InitializeScheduler() { - DCHECK(scheduler_ == NULL); + DCHECK(scheduler_ == nullptr); DCHECK_GT(plan_.cluster().NumHosts(), 0) << "Cannot initialize scheduler with 0 " << "hosts."; const Host& scheduler_host = plan_.cluster().hosts()[0]; @@ -504,8 +504,8 @@ void SchedulerWrapper::InitializeScheduler() { scheduler_backend_address.hostname = scheduler_host.ip; scheduler_backend_address.port = scheduler_host.be_port; - scheduler_.reset(new Scheduler( - NULL, scheduler_backend_id, scheduler_backend_address, &metrics_, NULL, NULL)); + scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id, scheduler_backend_address, + &metrics_, nullptr, nullptr)); scheduler_->Init(); // Initialize the scheduler backend maps. SendFullMembershipMap(); @@ -532,7 +532,7 @@ void SchedulerWrapper::AddHostToTopicDelta(const Host& host, TTopicDelta* delta) } void SchedulerWrapper::SendTopicDelta(const TTopicDelta& delta) { - DCHECK(scheduler_ != NULL); + DCHECK(scheduler_ != nullptr); // Wrap in topic delta map. StatestoreSubscriber::TopicDeltaMap delta_map; delta_map.emplace(Scheduler::IMPALA_MEMBERSHIP_TOPIC, delta); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/scheduler-test.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc index 272a288..3e05c5b 100644 --- a/be/src/scheduling/scheduler-test.cc +++ b/be/src/scheduling/scheduler-test.cc @@ -362,8 +362,11 @@ TEST_F(SchedulerTest, TestSendUpdates) { } /// IMPALA-4329: Test scheduling with no backends. -/// With the fix for IMPALA-4494, the scheduler will always register its local backend -/// with itself, so scheduling with no backends will still succeed. +/// With the fix for IMPALA-5058, the scheduler is no longer responsible for +/// registering the local backend with itself. This functionality is moved to +/// ImpalaServer::MembershipCallback() and the scheduler will receive the local +/// backend info through the statestore update, so until that happens, scheduling +/// should fail. TEST_F(SchedulerTest, TestEmptyBackendConfig) { Cluster cluster; cluster.AddHost(false, true); @@ -377,7 +380,7 @@ TEST_F(SchedulerTest, TestEmptyBackendConfig) { Result result(plan); SchedulerWrapper scheduler(plan); Status status = scheduler.Compute(&result); - EXPECT_TRUE(status.ok()); + EXPECT_TRUE(!status.ok()); } /// IMPALA-4494: Test scheduling with no backends but exec_at_coord. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index 70dbcdc..a73e13a 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -30,6 +30,7 @@ #include "gen-cpp/ImpalaInternalService_constants.h" #include "gen-cpp/Types_types.h" #include "rapidjson/rapidjson.h" +#include "runtime/exec-env.h" #include "statestore/statestore-subscriber.h" #include "util/container-util.h" #include "util/metrics.h" @@ -46,8 +47,6 @@ using namespace strings; DECLARE_int32(be_port); DECLARE_string(hostname); -DEFINE_bool(disable_admission_control, false, "Disables admission control."); - namespace impala { static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total"); @@ -69,17 +68,11 @@ Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id, statestore_subscriber_(subscriber), local_backend_id_(backend_id), thrift_serializer_(false), - total_assignments_(NULL), - total_local_assignments_(NULL), - initialized_(NULL), + total_assignments_(nullptr), + total_local_assignments_(nullptr), + initialized_(nullptr), request_pool_service_(request_pool_service) { local_backend_descriptor_.address = backend_address; - - if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled."; - if (!FLAGS_disable_admission_control) { - admission_controller_.reset( - new AdmissionController(request_pool_service_, metrics, backend_address)); - } } Scheduler::Scheduler(const vector<TNetworkAddress>& backends, MetricGroup* metrics, @@ -87,20 +80,14 @@ Scheduler::Scheduler(const vector<TNetworkAddress>& backends, MetricGroup* metri : backend_config_(std::make_shared<const BackendConfig>(backends)), metrics_(metrics), webserver_(webserver), - statestore_subscriber_(NULL), + statestore_subscriber_(nullptr), thrift_serializer_(false), - total_assignments_(NULL), - total_local_assignments_(NULL), - initialized_(NULL), + total_assignments_(nullptr), + total_local_assignments_(nullptr), + initialized_(nullptr), request_pool_service_(request_pool_service) { DCHECK(backends.size() > 0); local_backend_descriptor_.address = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port); - if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled."; - // request_pool_service_ may be null in unit tests - if (request_pool_service_ != NULL && !FLAGS_disable_admission_control) { - admission_controller_.reset( - new AdmissionController(request_pool_service_, metrics, TNetworkAddress())); - } } Status Scheduler::Init() { @@ -122,14 +109,14 @@ Status Scheduler::Init() { coord_only_backend_config_.AddBackend(local_backend_descriptor_); - if (webserver_ != NULL) { + if (webserver_ != nullptr) { Webserver::UrlCallback backends_callback = bind<void>(mem_fn(&Scheduler::BackendsUrlCallback), this, _1, _2); webserver_->RegisterUrlCallback( BACKENDS_WEB_PAGE, BACKENDS_TEMPLATE, backends_callback); } - if (statestore_subscriber_ != NULL) { + if (statestore_subscriber_ != nullptr) { StatestoreSubscriber::UpdateCallback cb = bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2); Status status = statestore_subscriber_->AddTopic(IMPALA_MEMBERSHIP_TOPIC, true, cb); @@ -137,12 +124,9 @@ Status Scheduler::Init() { status.AddDetail("Scheduler failed to register membership topic"); return status; } - if (!FLAGS_disable_admission_control) { - RETURN_IF_ERROR(admission_controller_->Init(statestore_subscriber_)); - } } - if (metrics_ != NULL) { + if (metrics_ != nullptr) { // This is after registering with the statestored, so we already have to synchronize // access to the backend_config_ shared_ptr. int num_backends = GetBackendConfig()->NumBackends(); @@ -153,8 +137,8 @@ Status Scheduler::Init() { metrics_->AddGauge<int64_t>(NUM_BACKENDS_KEY, num_backends); } - if (statestore_subscriber_ != NULL) { - if (webserver_ != NULL) { + if (statestore_subscriber_ != nullptr) { + if (webserver_ != nullptr) { const TNetworkAddress& webserver_address = webserver_->http_address(); if (IsWildcardAddress(webserver_address.hostname)) { local_backend_descriptor_.__set_debug_http_address( @@ -253,31 +237,9 @@ void Scheduler::UpdateMembership( } } - // If the local backend is not in our view of the membership list, we should add it - // and tell the statestore. We also ensure that it is part of our backend config. - if (current_membership_.find(local_backend_id_) == current_membership_.end()) { - new_backend_config->AddBackend(local_backend_descriptor_); - VLOG(1) << "Registering local backend with statestore"; - subscriber_topic_updates->push_back(TTopicDelta()); - TTopicDelta& update = subscriber_topic_updates->back(); - update.topic_name = IMPALA_MEMBERSHIP_TOPIC; - update.topic_entries.push_back(TTopicItem()); - - TTopicItem& item = update.topic_entries.back(); - item.key = local_backend_id_; - Status status = thrift_serializer_.Serialize(&local_backend_descriptor_, &item.value); - if (!status.ok()) { - LOG(WARNING) << "Failed to serialize Impala backend address for statestore topic:" - << " " << status.GetDetail(); - subscriber_topic_updates->pop_back(); - } - } - - DCHECK(new_backend_config->LookUpBackendIp( - local_backend_descriptor_.address.hostname, nullptr)); SetBackendConfig(new_backend_config); - if (metrics_ != NULL) { + if (metrics_ != nullptr) { /// TODO-MT: fix this (do we even need to report it?) num_fragment_instances_metric_->set_value(current_membership_.size()); } @@ -285,7 +247,7 @@ void Scheduler::UpdateMembership( Scheduler::BackendConfigPtr Scheduler::GetBackendConfig() const { lock_guard<mutex> l(backend_config_lock_); - DCHECK(backend_config_.get() != NULL); + DCHECK(backend_config_.get() != nullptr); BackendConfigPtr backend_config = backend_config_; return backend_config; } @@ -660,7 +622,7 @@ Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& backend_config // cache to worry about. // Remote reads will always break ties by backend rank. bool decide_local_assignment_by_rank = random_replica || cached_replica; - const IpAddr* backend_ip = NULL; + const IpAddr* backend_ip = nullptr; backend_ip = assignment_ctx.SelectLocalBackendHost( backend_candidates, decide_local_assignment_by_rank); TBackendDescriptor backend; @@ -771,17 +733,6 @@ Status Scheduler::Schedule(QuerySchedule* schedule) { } } schedule->SetUniqueHosts(unique_hosts); - - if (!FLAGS_disable_admission_control) { - RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule)); - } - return Status::OK(); -} - -Status Scheduler::Release(QuerySchedule* schedule) { - if (!FLAGS_disable_admission_control) { - RETURN_IF_ERROR(admission_controller_->ReleaseQuery(schedule)); - } return Status::OK(); } @@ -844,7 +795,7 @@ const IpAddr* Scheduler::AssignmentCtx::SelectRemoteBackendHost() { DCHECK_EQ(backend_config_.NumBackends(), assignment_heap_.size()); candidate_ip = &(assignment_heap_.top().ip); } - DCHECK(candidate_ip != NULL); + DCHECK(candidate_ip != nullptr); return candidate_ip; } @@ -866,7 +817,7 @@ int Scheduler::AssignmentCtx::GetBackendRank(const IpAddr& ip) const { void Scheduler::AssignmentCtx::SelectBackendOnHost( const IpAddr& backend_ip, TBackendDescriptor* backend) { - DCHECK(backend_config_.LookUpBackendIp(backend_ip, NULL)); + DCHECK(backend_config_.LookUpBackendIp(backend_ip, nullptr)); const BackendConfig::BackendList& backends_on_host = backend_config_.GetBackendListForHost(backend_ip); DCHECK(backends_on_host.size() > 0); @@ -934,8 +885,8 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment( if (is_cached) assignment_byte_counters_.cached_bytes += scan_range_length; } - if (total_assignments_ != NULL) { - DCHECK(total_local_assignments_ != NULL); + if (total_assignments_ != nullptr) { + DCHECK(total_local_assignments_ != nullptr); total_assignments_->Increment(1); if (!remote_read) total_local_assignments_->Increment(1); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/scheduler.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h index bd2aff4..ca520c8 100644 --- a/be/src/scheduling/scheduler.h +++ b/be/src/scheduling/scheduler.h @@ -33,9 +33,9 @@ #include "gen-cpp/Types_types.h" // for TNetworkAddress #include "rapidjson/document.h" #include "rpc/thrift-util.h" -#include "scheduling/admission-controller.h" #include "scheduling/backend-config.h" #include "scheduling/query-schedule.h" +#include "scheduling/request-pool-service.h" #include "statestore/statestore-subscriber.h" #include "util/metrics.h" #include "util/network-util.h" @@ -101,9 +101,6 @@ class Scheduler { /// returning. Status Schedule(QuerySchedule* schedule); - /// Releases the reserved resources (if any) from the given schedule. - Status Release(QuerySchedule* schedule); - private: /// Map from a host's IP address to the next backend to be round-robin scheduled for /// that host (needed for setups with multiple backends on a single host) @@ -324,9 +321,6 @@ class Scheduler { /// us. RequestPoolService* request_pool_service_; - /// Used to make admission decisions in 'Schedule()' - boost::scoped_ptr<AdmissionController> admission_controller_; - /// Helper methods to access backend_config_ (the shared_ptr, not its contents), /// protecting the access with backend_config_lock_. BackendConfigPtr GetBackendConfig() const; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 8417cf1..313708e 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -177,6 +177,10 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be i "QUERY_TIMEOUT_S overrides this setting, but, if set, --idle_query_timeout represents" " the maximum allowable timeout."); +DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and coordinate " + "queries from clients. If false, this daemon will only execute query fragments, and " + "will refuse client connections."); + // TODO: Remove for Impala 3.0. DEFINE_string(local_nodemanager_url, "", "Deprecated"); @@ -244,7 +248,8 @@ class CancellationWork { }; ImpalaServer::ImpalaServer(ExecEnv* exec_env) - : exec_env_(exec_env) { + : exec_env_(exec_env), + thrift_serializer_(false) { // Initialize default config InitializeConfigVariables(); @@ -328,15 +333,21 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env) ABORT_IF_ERROR(ExternalDataSourceExecutor::InitJNI(exec_env->metrics())); // Register the membership callback if required - if (exec_env->subscriber() != NULL) { - StatestoreSubscriber::UpdateCallback cb = - bind<void>(mem_fn(&ImpalaServer::MembershipCallback), this, _1, _2); + if (exec_env->subscriber() != nullptr) { + auto cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state, + vector<TTopicDelta>* topic_updates) { + this->MembershipCallback(state, topic_updates); + }; exec_env->subscriber()->AddTopic(Scheduler::IMPALA_MEMBERSHIP_TOPIC, true, cb); - StatestoreSubscriber::UpdateCallback catalog_cb = - bind<void>(mem_fn(&ImpalaServer::CatalogUpdateCallback), this, _1, _2); - exec_env->subscriber()->AddTopic( - CatalogServer::IMPALA_CATALOG_TOPIC, true, catalog_cb); + if (FLAGS_is_coordinator) { + auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state, + vector<TTopicDelta>* topic_updates) { + this->CatalogUpdateCallback(state, topic_updates); + }; + exec_env->subscriber()->AddTopic(CatalogServer::IMPALA_CATALOG_TOPIC, true, + catalog_cb); + } } ABORT_IF_ERROR(UpdateCatalogMetrics()); @@ -358,6 +369,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env) query_expiration_thread_.reset(new Thread("impala-server", "query-expirer", bind<void>(&ImpalaServer::ExpireQueries, this))); + is_coordinator_ = FLAGS_is_coordinator; exec_env_->SetImpalaServer(this); } @@ -392,6 +404,8 @@ Status ImpalaServer::LogLineageRecord(const QueryExecState& query_exec_state) { return status; } +bool ImpalaServer::IsCoordinator() { return is_coordinator_; } + bool ImpalaServer::IsLineageLoggingEnabled() { return !FLAGS_lineage_event_log_dir.empty(); } @@ -562,7 +576,7 @@ Status ImpalaServer::InitProfileLogging() { Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id, bool base64_encoded, stringstream* output) { - DCHECK(output != NULL); + DCHECK(output != nullptr); // Search for the query id in the active query map { shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false); @@ -599,9 +613,9 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, TExecSummary* res // Search for the query id in the active query map. { shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true); - if (exec_state != NULL) { + if (exec_state != nullptr) { lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t()); - if (exec_state->coord() != NULL) { + if (exec_state->coord() != nullptr) { TExecProgress progress; { lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock()); @@ -687,7 +701,7 @@ void ImpalaServer::ArchiveQuery(const QueryExecState& query) { if (FLAGS_query_log_size == 0) return; QueryStateRecord record(query, true, encoded_profile_str); - if (query.coord() != NULL) { + if (query.coord() != nullptr) { lock_guard<SpinLock> lock(query.coord()->GetExecSummaryLock()); record.exec_summary = query.coord()->exec_summary(); } @@ -771,7 +785,7 @@ Status ImpalaServer::ExecuteInternal( shared_ptr<SessionState> session_state, bool* registered_exec_state, shared_ptr<QueryExecState>* exec_state) { - DCHECK(session_state != NULL); + DCHECK(session_state != nullptr); *registered_exec_state = false; exec_state->reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(), @@ -820,7 +834,7 @@ Status ImpalaServer::ExecuteInternal( } } - if ((*exec_state)->coord() != NULL) { + if ((*exec_state)->coord() != nullptr) { const unordered_set<TNetworkAddress>& unique_hosts = (*exec_state)->schedule()->unique_hosts(); if (!unique_hosts.empty()) { @@ -944,7 +958,7 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli exec_state->session()->inflight_queries.erase(query_id); } - if (exec_state->coord() != NULL) { + if (exec_state->coord() != nullptr) { string exec_summary; { lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock()); @@ -975,12 +989,12 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli Status ImpalaServer::UpdateCatalogMetrics() { TGetDbsResult dbs; - RETURN_IF_ERROR(exec_env_->frontend()->GetDbs(NULL, NULL, &dbs)); + RETURN_IF_ERROR(exec_env_->frontend()->GetDbs(nullptr, nullptr, &dbs)); ImpaladMetrics::CATALOG_NUM_DBS->set_value(dbs.dbs.size()); ImpaladMetrics::CATALOG_NUM_TABLES->set_value(0L); for (const TDatabase& db: dbs.dbs) { TGetTablesResult table_names; - RETURN_IF_ERROR(exec_env_->frontend()->GetTableNames(db.db_name, NULL, NULL, + RETURN_IF_ERROR(exec_env_->frontend()->GetTableNames(db.db_name, nullptr, nullptr, &table_names)); ImpaladMetrics::CATALOG_NUM_TABLES->Increment(table_names.tables.size()); } @@ -992,7 +1006,7 @@ Status ImpalaServer::CancelInternal(const TUniqueId& query_id, bool check_inflig const Status* cause) { VLOG_QUERY << "Cancel(): query_id=" << PrintId(query_id); shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false); - if (exec_state == NULL) return Status("Invalid or unknown query handle"); + if (exec_state == nullptr) return Status("Invalid or unknown query handle"); exec_state->Cancel(check_inflight, cause); return Status::OK(); } @@ -1014,7 +1028,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id, session_state = entry->second; session_state_map_.erase(session_id); } - DCHECK(session_state != NULL); + DCHECK(session_state != nullptr); if (session_state->session_type == TSessionType::BEESWAX) { ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS->Increment(-1L); } else { @@ -1081,7 +1095,7 @@ void ImpalaServer::ReportExecStatus( // every report (assign each query a local int32_t id and use that to index into a // vector of QueryExecStates, w/o lookup or locking?) shared_ptr<QueryExecState> exec_state = GetQueryExecState(params.query_id, false); - if (exec_state.get() == NULL) { + if (exec_state.get() == nullptr) { // This is expected occasionally (since a report RPC might be in flight while // cancellation is happening). Return an error to the caller to get it to stop. const string& err = Substitute("ReportExecStatus(): Received report for unknown " @@ -1475,6 +1489,10 @@ void ImpalaServer::MembershipCallback( // This is a new item - add it to the map of known backends. known_backends_.insert(make_pair(item.key, backend_descriptor)); } + + // Register the local backend in the statestore and update the list of known backends. + AddLocalBackendToStatestore(subscriber_topic_updates); + // Process membership deletions. for (const string& backend_id: delta.topic_deletions) { known_backends_.erase(backend_id); @@ -1559,13 +1577,48 @@ void ImpalaServer::MembershipCallback( } } +void ImpalaServer::AddLocalBackendToStatestore( + vector<TTopicDelta>* subscriber_topic_updates) { + const string& local_backend_id = exec_env_->subscriber()->id(); + if (known_backends_.find(local_backend_id) != known_backends_.end()) return; + + TBackendDescriptor local_backend_descriptor; + local_backend_descriptor.__set_address( + MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)); + IpAddr ip; + const Hostname& hostname = local_backend_descriptor.address.hostname; + Status status = HostnameToIpAddr(hostname, &ip); + if (!status.ok()) { + // TODO: Should we do something about this failure? + LOG(WARNING) << "Failed to convert hostname " << hostname << " to IP address: " + << status.GetDetail(); + return; + } + local_backend_descriptor.ip_address = ip; + subscriber_topic_updates->emplace_back(TTopicDelta()); + TTopicDelta& update = subscriber_topic_updates->back(); + update.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC; + update.topic_entries.emplace_back(TTopicItem()); + + TTopicItem& item = update.topic_entries.back(); + item.key = local_backend_id; + status = thrift_serializer_.Serialize(&local_backend_descriptor, &item.value); + if (!status.ok()) { + LOG(WARNING) << "Failed to serialize Impala backend descriptor for statestore topic:" + << " " << status.GetDetail(); + subscriber_topic_updates->pop_back(); + } else { + known_backends_.insert(make_pair(item.key, local_backend_descriptor)); + } +} + ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_state, bool copy_profile, const string& encoded_profile) { id = exec_state.query_id(); const TExecRequest& request = exec_state.exec_request(); const string* plan_str = exec_state.summary_profile().GetInfoString("Plan"); - if (plan_str != NULL) plan = *plan_str; + if (plan_str != nullptr) plan = *plan_str; stmt = exec_state.sql_stmt(); stmt_type = request.stmt_type; effective_user = exec_state.effective_user(); @@ -1575,7 +1628,7 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_stat has_coord = false; Coordinator* coord = exec_state.coord(); - if (coord != NULL) { + if (coord != nullptr) { num_complete_fragments = coord->progress().num_complete(); total_fragments = coord->progress().total(); has_coord = true; @@ -1770,7 +1823,7 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) { if (expiration_event->first > now) break; shared_ptr<QueryExecState> query_state = GetQueryExecState(expiration_event->second, false); - if (query_state.get() == NULL) { + if (query_state.get() == nullptr) { // Query was deleted some other way. queries_by_timestamp_.erase(expiration_event++); continue; @@ -1830,17 +1883,43 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) { Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int be_port, ThriftServer** beeswax_server, ThriftServer** hs2_server, ThriftServer** be_server, - ImpalaServer** impala_server) { - DCHECK((beeswax_port == 0) == (beeswax_server == NULL)); - DCHECK((hs2_port == 0) == (hs2_server == NULL)); - DCHECK((be_port == 0) == (be_server == NULL)); + boost::shared_ptr<ImpalaServer>* impala_server) { + DCHECK((beeswax_port == 0) == (beeswax_server == nullptr)); + DCHECK((hs2_port == 0) == (hs2_server == nullptr)); + DCHECK((be_port == 0) == (be_server == nullptr)); - boost::shared_ptr<ImpalaServer> handler(new ImpalaServer(exec_env)); + impala_server->reset(new ImpalaServer(exec_env)); - if (beeswax_port != 0 && beeswax_server != NULL) { + if (be_port != 0 && be_server != nullptr) { + boost::shared_ptr<ImpalaInternalService> thrift_if(new ImpalaInternalService()); + boost::shared_ptr<TProcessor> be_processor( + new ImpalaInternalServiceProcessor(thrift_if)); + boost::shared_ptr<TProcessorEventHandler> event_handler( + new RpcEventHandler("backend", exec_env->metrics())); + be_processor->setEventHandler(event_handler); + + *be_server = new ThriftServer("backend", be_processor, be_port, nullptr, + exec_env->metrics(), FLAGS_be_service_threads); + if (EnableInternalSslConnections()) { + LOG(INFO) << "Enabling SSL for backend"; + RETURN_IF_ERROR((*be_server)->EnableSsl(FLAGS_ssl_server_certificate, + FLAGS_ssl_private_key, FLAGS_ssl_private_key_password_cmd)); + } + + LOG(INFO) << "ImpalaInternalService listening on " << be_port; + } + if (!FLAGS_is_coordinator) { + LOG(INFO) << "Started worker Impala server on " + << ExecEnv::GetInstance()->backend_address(); + return Status::OK(); + } + + // Initialize the HS2 and Beeswax services. + if (beeswax_port != 0 && beeswax_server != nullptr) { // Beeswax FE must be a TThreadPoolServer because ODBC and Hue only support // TThreadPoolServer. - boost::shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler)); + boost::shared_ptr<TProcessor> beeswax_processor( + new ImpalaServiceProcessor(*impala_server)); boost::shared_ptr<TProcessorEventHandler> event_handler( new RpcEventHandler("beeswax", exec_env->metrics())); beeswax_processor->setEventHandler(event_handler); @@ -1848,7 +1927,7 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int beeswax_port, AuthManager::GetInstance()->GetExternalAuthProvider(), exec_env->metrics(), FLAGS_fe_service_threads, ThriftServer::ThreadPool); - (*beeswax_server)->SetConnectionHandler(handler.get()); + (*beeswax_server)->SetConnectionHandler(impala_server->get()); if (!FLAGS_ssl_server_certificate.empty()) { LOG(INFO) << "Enabling SSL for Beeswax"; RETURN_IF_ERROR((*beeswax_server)->EnableSsl(FLAGS_ssl_server_certificate, @@ -1858,10 +1937,10 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_port; } - if (hs2_port != 0 && hs2_server != NULL) { + if (hs2_port != 0 && hs2_server != nullptr) { // HiveServer2 JDBC driver does not support non-blocking server. boost::shared_ptr<TProcessor> hs2_fe_processor( - new ImpalaHiveServer2ServiceProcessor(handler)); + new ImpalaHiveServer2ServiceProcessor(*impala_server)); boost::shared_ptr<TProcessorEventHandler> event_handler( new RpcEventHandler("hs2", exec_env->metrics())); hs2_fe_processor->setEventHandler(event_handler); @@ -1870,7 +1949,7 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int AuthManager::GetInstance()->GetExternalAuthProvider(), exec_env->metrics(), FLAGS_fe_service_threads, ThriftServer::ThreadPool); - (*hs2_server)->SetConnectionHandler(handler.get()); + (*hs2_server)->SetConnectionHandler(impala_server->get()); if (!FLAGS_ssl_server_certificate.empty()) { LOG(INFO) << "Enabling SSL for HiveServer2"; RETURN_IF_ERROR((*hs2_server)->EnableSsl(FLAGS_ssl_server_certificate, @@ -1880,32 +1959,14 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_port; } - if (be_port != 0 && be_server != NULL) { - boost::shared_ptr<ImpalaInternalService> thrift_if(new ImpalaInternalService()); - boost::shared_ptr<TProcessor> be_processor( - new ImpalaInternalServiceProcessor(thrift_if)); - boost::shared_ptr<TProcessorEventHandler> event_handler( - new RpcEventHandler("backend", exec_env->metrics())); - be_processor->setEventHandler(event_handler); - - *be_server = new ThriftServer("backend", be_processor, be_port, NULL, - exec_env->metrics(), FLAGS_be_service_threads); - if (EnableInternalSslConnections()) { - LOG(INFO) << "Enabling SSL for backend"; - RETURN_IF_ERROR((*be_server)->EnableSsl(FLAGS_ssl_server_certificate, - FLAGS_ssl_private_key, FLAGS_ssl_private_key_password_cmd)); - } - - LOG(INFO) << "ImpalaInternalService listening on " << be_port; - } - if (impala_server != NULL) *impala_server = handler.get(); - + LOG(INFO) << "Started coordinator Impala server on " + << ExecEnv::GetInstance()->backend_address(); return Status::OK(); } bool ImpalaServer::GetSessionIdForQuery(const TUniqueId& query_id, TUniqueId* session_id) { - DCHECK(session_id != NULL); + DCHECK(session_id != nullptr); lock_guard<mutex> l(query_exec_state_map_lock_); QueryExecStateMap::iterator i = query_exec_state_map_.find(query_id); if (i == query_exec_state_map_.end()) { @@ -1931,7 +1992,7 @@ shared_ptr<ImpalaServer::QueryExecState> ImpalaServer::GetQueryExecState( void ImpalaServer::UpdateFilter(TUpdateFilterResult& result, const TUpdateFilterParams& params) { shared_ptr<QueryExecState> query_exec_state = GetQueryExecState(params.query_id, false); - if (query_exec_state.get() == NULL) { + if (query_exec_state.get() == nullptr) { LOG(INFO) << "Could not find query exec state: " << params.query_id; return; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/service/impala-server.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 588a5c3..45e8080 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -270,6 +270,9 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// Returns true if lineage logging is enabled, false otherwise. bool IsLineageLoggingEnabled(); + /// Retuns true if this is a coordinator, false otherwise. + bool IsCoordinator(); + /// The prefix of audit event log filename. static const string AUDIT_EVENT_LOG_FILE_PREFIX; @@ -432,6 +435,10 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// on why the failure occurred. Status AuthorizeProxyUser(const std::string& user, const std::string& do_as_user); + // Check if the local backend descriptor is in the list of known backends. If not, add + // it to the list of known backends and add it to the 'topic_updates'. + void AddLocalBackendToStatestore(std::vector<TTopicDelta>* topic_updates); + /// Snapshot of a query's state, archived in the query log. struct QueryStateRecord { /// Pretty-printed runtime profile. TODO: Copy actual profile object @@ -942,6 +949,13 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// Container for a thread that runs ExpireQueries() if FLAGS_idle_query_timeout is set. boost::scoped_ptr<Thread> query_expiration_thread_; + + /// Serializes TBackendDescriptors when creating topic updates + ThriftSerializer thrift_serializer_; + + /// True if this ImpalaServer can accept client connections and coordinate + /// queries. + bool is_coordinator_; }; /// Create an ImpalaServer and Thrift servers. @@ -949,6 +963,8 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// ImpalaService (Beeswax) on beeswax_port (returned via beeswax_server). /// If hs2_port != 0 (and hs2_server != NULL), creates a ThriftServer exporting /// ImpalaHiveServer2Service on hs2_port (returned via hs2_server). +/// ImpalaService and ImpalaHiveServer2Service are initialized only if this +/// Impala server is a coordinator (indicated by the is_coordinator flag). /// If be_port != 0 (and be_server != NULL), create a ThriftServer exporting /// ImpalaInternalService on be_port (returned via be_server). /// Returns created ImpalaServer. The caller owns fe_server and be_server. @@ -958,7 +974,7 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, /// which case none of the output parameters can be assumed to be valid. Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int be_port, ThriftServer** beeswax_server, ThriftServer** hs2_server, - ThriftServer** be_server, ImpalaServer** impala_server); + ThriftServer** be_server, boost::shared_ptr<ImpalaServer>* impala_server); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/service/impalad-main.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc index 4848b45..01e5b55 100644 --- a/be/src/service/impalad-main.cc +++ b/be/src/service/impalad-main.cc @@ -56,6 +56,7 @@ DECLARE_int32(hs2_port); DECLARE_int32(be_port); DECLARE_string(principal); DECLARE_bool(enable_rm); +DECLARE_bool(is_coordinator); int ImpaladMain(int argc, char** argv) { InitCommonRuntime(argc, argv, true); @@ -84,14 +85,17 @@ int ImpaladMain(int argc, char** argv) { ThriftServer* beeswax_server = NULL; ThriftServer* hs2_server = NULL; ThriftServer* be_server = NULL; - ImpalaServer* server = NULL; + boost::shared_ptr<ImpalaServer> server; ABORT_IF_ERROR(CreateImpalaServer(&exec_env, FLAGS_beeswax_port, FLAGS_hs2_port, FLAGS_be_port, &beeswax_server, &hs2_server, &be_server, &server)); ABORT_IF_ERROR(be_server->Start()); - ABORT_IF_ERROR(beeswax_server->Start()); - ABORT_IF_ERROR(hs2_server->Start()); + if (FLAGS_is_coordinator) { + ABORT_IF_ERROR(beeswax_server->Start()); + ABORT_IF_ERROR(hs2_server->Start()); + } + Status status = exec_env.StartServices(); if (!status.ok()) { LOG(ERROR) << "Impalad services did not start correctly, exiting. Error: " @@ -101,13 +105,17 @@ int ImpaladMain(int argc, char** argv) { } ImpaladMetrics::IMPALA_SERVER_READY->set_value(true); LOG(INFO) << "Impala has started."; - // this blocks until the beeswax and hs2 servers terminate - beeswax_server->Join(); - hs2_server->Join(); + be_server->Join(); delete be_server; - delete beeswax_server; - delete hs2_server; + + if (FLAGS_is_coordinator) { + // this blocks until the beeswax and hs2 servers terminate + beeswax_server->Join(); + hs2_server->Join(); + delete beeswax_server; + delete hs2_server; + } return 0; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/service/query-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc index 2cf2e5d..f704676 100644 --- a/be/src/service/query-exec-state.cc +++ b/be/src/service/query-exec-state.cc @@ -25,6 +25,7 @@ #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "runtime/exec-env.h" +#include "scheduling/admission-controller.h" #include "scheduling/scheduler.h" #include "service/frontend.h" #include "service/impala-server.h" @@ -456,6 +457,14 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest( RETURN_IF_ERROR(UpdateQueryStatus(status)); } + if (exec_env_->admission_controller() != nullptr) { + status = exec_env_->admission_controller()->AdmitQuery(schedule_.get()); + { + lock_guard<mutex> l(lock_); + RETURN_IF_ERROR(UpdateQueryStatus(status)); + } + } + coord_.reset(new Coordinator(*schedule_, exec_env_, query_events_)); status = coord_->Exec(); { @@ -570,10 +579,12 @@ void ImpalaServer::QueryExecState::Done() { if (coord_.get() != NULL) { // Release any reserved resources. - Status status = exec_env_->scheduler()->Release(schedule_.get()); - if (!status.ok()) { - LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id() - << " because of error: " << status.GetDetail(); + if (exec_env_->admission_controller() != nullptr) { + Status status = exec_env_->admission_controller()->ReleaseQuery(schedule_.get()); + if (!status.ok()) { + LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id() + << " because of error: " << status.GetDetail(); + } } coord_->TearDown(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/testutil/in-process-servers.h ---------------------------------------------------------------------- diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h index 54c7484..870f02d 100644 --- a/be/src/testutil/in-process-servers.h +++ b/be/src/testutil/in-process-servers.h @@ -69,7 +69,7 @@ class InProcessImpalaServer { /// there was an error joining. Status Join(); - ImpalaServer* impala_server() { return impala_server_; } + ImpalaServer* impala_server() { return impala_server_.get(); } MetricGroup* metrics() { return exec_env_->metrics(); } @@ -92,10 +92,9 @@ class InProcessImpalaServer { uint32_t hs2_port_; - /// The ImpalaServer that handles client and backend requests. Not owned by this class; - /// instead it's owned via shared_ptrs in the ThriftServers. See CreateImpalaServer for - /// details. - ImpalaServer* impala_server_; + /// The ImpalaServer that handles client and backend requests. Ownership is shared via + /// shared_ptrs with the ThriftServers. See CreateImpalaServer for details. + boost::shared_ptr<ImpalaServer> impala_server_; /// ExecEnv holds much of the per-service state boost::scoped_ptr<ExecEnv> exec_env_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/util/webserver.cc ---------------------------------------------------------------------- diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc index 61ef3ea..3b7c4f9 100644 --- a/be/src/util/webserver.cc +++ b/be/src/util/webserver.cc @@ -34,6 +34,8 @@ #include "common/logging.h" #include "rpc/thrift-util.h" +#include "runtime/exec-env.h" +#include "service/impala-server.h" #include "thirdparty/mustache/mustache.h" #include "util/asan.h" #include "util/coding-util.h" @@ -97,6 +99,8 @@ DEFINE_string(webserver_password_file, "", DEFINE_string(webserver_x_frame_options, "DENY", "webserver will add X-Frame-Options HTTP header with this value"); +DECLARE_bool(is_coordinator); + static const char* DOC_FOLDER = "/www/"; static const int DOC_FOLDER_LEN = strlen(DOC_FOLDER); @@ -116,7 +120,7 @@ static const char* ERROR_KEY = "__error_msg__"; const char* GetDefaultDocumentRoot() { stringstream ss; char* impala_home = getenv("IMPALA_HOME"); - if (impala_home == NULL) { + if (impala_home == nullptr) { return ""; // Empty document root means don't serve static files } else { ss << impala_home; @@ -152,7 +156,7 @@ string BuildHeaderString(ResponseCode response, ContentType content_type) { } Webserver::Webserver() - : context_(NULL), + : context_(nullptr), error_handler_(UrlHandler(bind<void>(&Webserver::ErrorHandler, this, _1, _2), "error.tmpl", false)) { http_address_ = MakeNetworkAddress( @@ -161,7 +165,7 @@ Webserver::Webserver() } Webserver::Webserver(const int port) - : context_(NULL), + : context_(nullptr), error_handler_(UrlHandler(bind<void>(&Webserver::ErrorHandler, this, _1, _2), "error.tmpl", false)) { http_address_ = MakeNetworkAddress("0.0.0.0", port); @@ -186,6 +190,13 @@ void Webserver::RootHandler(const ArgumentMap& args, Document* document) { document->GetAllocator()); document->AddMember("process_state_info", process_state_info, document->GetAllocator()); + + ExecEnv* env = ExecEnv::GetInstance(); + if (env == nullptr || env->impala_server() == nullptr) return; + string mode = (env->impala_server()->IsCoordinator()) ? + "Coordinator + Executor" : "Executor"; + Value impala_server_mode(mode.c_str(), document->GetAllocator()); + document->AddMember("impala_server_mode", impala_server_mode, document->GetAllocator()); } void Webserver::ErrorHandler(const ArgumentMap& args, Document* document) { @@ -300,7 +311,7 @@ Status Webserver::Start() { options.push_back("no"); // Options must be a NULL-terminated list - options.push_back(NULL); + options.push_back(nullptr); // squeasel ignores SIGCHLD and we need it to run kinit. This means that since // squeasel does not reap its own children CGI programs must be avoided. @@ -321,7 +332,7 @@ Status Webserver::Start() { // Restore the child signal handler so wait() works properly. signal(SIGCHLD, sig_chld); - if (context_ == NULL) { + if (context_ == nullptr) { stringstream error_msg; error_msg << "Webserver: Could not start on address " << http_address_; return Status(error_msg.str()); @@ -337,14 +348,14 @@ Status Webserver::Start() { } void Webserver::Stop() { - if (context_ != NULL) { + if (context_ != nullptr) { sq_stop(context_); - context_ = NULL; + context_ = nullptr; } } void Webserver::GetCommonJson(Document* document) { - DCHECK(document != NULL); + DCHECK(document != nullptr); Value obj(kObjectType); obj.AddMember("process-name", google::ProgramInvocationShortName(), document->GetAllocator()); @@ -365,7 +376,7 @@ void Webserver::GetCommonJson(Document* document) { int Webserver::LogMessageCallbackStatic(const struct sq_connection* connection, const char* message) { - if (message != NULL) { + if (message != nullptr) { LOG(INFO) << "Webserver: " << message; } return PROCESSING_COMPLETE; @@ -389,7 +400,7 @@ int Webserver::BeginRequestCallback(struct sq_connection* connection, } map<string, string> arguments; - if (request_info->query_string != NULL) { + if (request_info->query_string != nullptr) { BuildArgumentMap(request_info->query_string, &arguments); } @@ -397,7 +408,7 @@ int Webserver::BeginRequestCallback(struct sq_connection* connection, UrlHandlerMap::const_iterator it = url_handlers_.find(request_info->uri); ResponseCode response = OK; ContentType content_type = HTML; - const UrlHandler* url_handler = NULL; + const UrlHandler* url_handler = nullptr; if (it == url_handlers_.end()) { response = NOT_FOUND; arguments[ERROR_KEY] = Substitute("No URI handler for '$0'", request_info->uri); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/bin/start-impala-cluster.py ---------------------------------------------------------------------- diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py index b276b08..37671e3 100755 --- a/bin/start-impala-cluster.py +++ b/bin/start-impala-cluster.py @@ -36,6 +36,8 @@ DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get('IMPALA_MAX_LOG_FILES', 10) parser = OptionParser() parser.add_option("-s", "--cluster_size", type="int", dest="cluster_size", default=3, help="Size of the cluster (number of impalad instances to start).") +parser.add_option("-c", "--num_coordinators", type="int", dest="num_coordinators", + default=3, help="Number of coordinators.") parser.add_option("--build_type", dest="build_type", default= 'latest', help="Build type to use - debug / release / latest") parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string", @@ -204,7 +206,7 @@ def build_jvm_args(instance_num): BASE_JVM_DEBUG_PORT = 30000 return JVM_ARGS % (BASE_JVM_DEBUG_PORT + instance_num, options.jvm_args) -def start_impalad_instances(cluster_size): +def start_impalad_instances(cluster_size, num_coordinators): if cluster_size == 0: # No impalad instances should be started. return @@ -239,6 +241,10 @@ def start_impalad_instances(cluster_size): if options.kudu_master_hosts: # Must be prepended, otherwise the java options interfere. args = "-kudu_master_hosts %s %s" % (options.kudu_master_hosts, args) + + if i >= num_coordinators: + args = "-is_coordinator=false %s" % (args) + stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name) exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path) @@ -279,9 +285,10 @@ def wait_for_cluster_web(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS): # impalad processes may take a while to come up. wait_for_impala_process_count(impala_cluster) for impalad in impala_cluster.impalads: - impalad.service.wait_for_num_known_live_backends(options.cluster_size, - timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2) - wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS) + if impalad._get_arg_value('is_coordinator', default='true') == 'true': + impalad.service.wait_for_num_known_live_backends(options.cluster_size, + timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2) + wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS) def wait_for_catalog(impalad, timeout_in_seconds): """Waits for the impalad catalog to become ready""" @@ -326,6 +333,10 @@ if __name__ == "__main__": print 'Please specify a cluster size >= 0' sys.exit(1) + if options.num_coordinators <= 0: + print 'Please specify a valid number of coordinators > 0' + sys.exit(1) + if not os.path.isdir(options.log_dir): print 'Log dir does not exist or is not a directory: %s' % options.log_dir sys.exit(1) @@ -372,7 +383,7 @@ if __name__ == "__main__": if not options.restart_impalad_only: start_statestore() start_catalogd() - start_impalad_instances(options.cluster_size) + start_impalad_instances(options.cluster_size, options.num_coordinators) # Sleep briefly to reduce log spam: the cluster takes some time to start up. sleep(3) wait_for_cluster() @@ -380,4 +391,5 @@ if __name__ == "__main__": print 'Error starting cluster: %s' % e sys.exit(1) - print 'Impala Cluster Running with %d nodes.' % options.cluster_size + print 'Impala Cluster Running with %d nodes and %d coordinators.' % ( + options.cluster_size, options.num_coordinators) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/tests/common/custom_cluster_test_suite.py ---------------------------------------------------------------------- diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index 28f598c..b19b5d2 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -30,6 +30,7 @@ from time import sleep IMPALA_HOME = os.environ['IMPALA_HOME'] CLUSTER_SIZE = 3 +NUM_COORDINATORS = CLUSTER_SIZE # The number of statestore subscribers is CLUSTER_SIZE (# of impalad) + 1 (for catalogd). NUM_SUBSCRIBERS = CLUSTER_SIZE + 1 @@ -107,10 +108,11 @@ class CustomClusterTestSuite(ImpalaTestSuite): @classmethod def _start_impala_cluster(cls, options, log_dir=os.getenv('LOG_DIR', "/tmp/"), - cluster_size=CLUSTER_SIZE, log_level=1): + cluster_size=CLUSTER_SIZE, num_coordinators=NUM_COORDINATORS, log_level=1): cls.impala_log_dir = log_dir cmd = [os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'), '--cluster_size=%d' % cluster_size, + '--num_coordinators=%d' % num_coordinators, '--log_dir=%s' % log_dir, '--log_level=%s' % log_level] try: @@ -123,7 +125,8 @@ class CustomClusterTestSuite(ImpalaTestSuite): raise Exception("statestored was not found") statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60) for impalad in cls.cluster.impalads: - impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE, timeout=60) + if impalad._get_arg_value('is_coordinator', default='true') == 'true': + impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE, timeout=60) def assert_impalad_log_contains(self, level, line_regex, expected_count=1): """ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/tests/common/impala_service.py ---------------------------------------------------------------------- diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py index 4e9c815..265b1b6 100644 --- a/tests/common/impala_service.py +++ b/tests/common/impala_service.py @@ -26,6 +26,10 @@ import urllib from time import sleep, time from tests.common.impala_connection import create_connection, create_ldap_connection +from TCLIService import TCLIService +from thrift.transport.TSocket import TSocket +from thrift.transport.TTransport import TBufferedTransport +from thrift.protocol import TBinaryProtocol logging.basicConfig(level=logging.ERROR, format='%(threadName)s: %(message)s') LOG = logging.getLogger('impala_service') @@ -201,6 +205,16 @@ class ImpaladService(BaseImpalaService): client.connect() return client + def create_hs2_client(self): + """Creates a new HS2 client connection to the impalad""" + host, port = (self.hostname, self.hs2_port) + socket = TSocket(host, port) + transport = TBufferedTransport(socket) + transport.open() + protocol = TBinaryProtocol.TBinaryProtocol(transport) + hs2_client = TCLIService.Client(protocol) + return hs2_client + def get_catalog_object_dump(self, object_type, object_name): return self.read_debug_webpage('catalog_objects?object_type=%s&object_name=%s' %\ (object_type, object_name)) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/tests/custom_cluster/test_coordinators.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py new file mode 100644 index 0000000..6010404 --- /dev/null +++ b/tests/custom_cluster/test_coordinators.py @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# The base class that should be used for almost all Impala tests + +import pytest + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite + +class TestCoordinators(CustomClusterTestSuite): + @pytest.mark.execute_serially + def test_multiple_coordinators(self): + """Test a cluster configuration in which not all impalad nodes are coordinators. + Verify that only coordinators can accept client connections and that select and DDL + queries run successfully.""" + + db_name = "TEST_MUL_COORD_DB" + self._start_impala_cluster([], num_coordinators=2, cluster_size=3) + assert len(self.cluster.impalads) == 3 + + coordinator1 = self.cluster.impalads[0] + coordinator2 = self.cluster.impalads[1] + worker = self.cluster.impalads[2] + + # Verify that Beeswax and HS2 client connections can't be established at a worker node + beeswax_client = None + try: + beeswax_client = worker.service.create_beeswax_client() + except: pass + finally: + assert beeswax_client is None + + hs2_client = None + try: + hs2_client = worker.service.create_hs2_client() + except: pass + finally: + assert hs2_client is None + + # Verify that queries can successfully run on coordinator nodes + try: + client1 = coordinator1.service.create_beeswax_client() + client2 = coordinator2.service.create_beeswax_client() + + # select queries + self.execute_query_expect_success(client1, "select 1") + self.execute_query_expect_success(client2, "select * from functional.alltypes"); + # DDL queries w/o SYNC_DDL + self.execute_query_expect_success(client1, "refresh functional.alltypes") + query_options = {"sync_ddl" : 1} + self.execute_query_expect_success(client2, "refresh functional.alltypesagg", + query_options) + self.execute_query_expect_success(client1, + "create database if not exists %s" % db_name, query_options) + # Create a table using one coordinator + self.execute_query_expect_success(client1, + "create table %s.foo1 (col int)" % db_name, query_options) + # Drop the table using the other coordinator + self.execute_query_expect_success(client2, "drop table %s.foo1" % db_name, + query_options) + # Swap roles and repeat + self.execute_query_expect_success(client2, + "create table %s.foo2 (col int)" % db_name, query_options) + self.execute_query_expect_success(client1, "drop table %s.foo2" % db_name, + query_options) + self.execute_query_expect_success(client1, "drop database %s cascade" % db_name) + finally: + # Ensure the worker hasn't received any table metadata + num_tbls = worker.service.get_metric_value('catalog.num-tables') + assert num_tbls == 0 + client1.close() + client2.close() http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/www/root.tmpl ---------------------------------------------------------------------- diff --git a/www/root.tmpl b/www/root.tmpl index 916b48b..40d448a 100644 --- a/www/root.tmpl +++ b/www/root.tmpl @@ -18,6 +18,10 @@ under the License. --> {{! Template for / }} {{>www/common-header.tmpl}} + {{?impala_server_mode}} + <h2>Impala Server Mode: {{impala_server_mode}}</h2> + {{/impala_server_mode}} + <h2>Vers<span id="v">i</span>on</h2> <pre id="version_pre">{{version}}</pre>
