Repository: incubator-impala Updated Branches: refs/heads/master 8cab36cf6 -> 19a2dcfbe
IMPALA-3394: Add tests, make BackendConfig own class, refactor This change factors SimpleScheduler::BackendConfig into an own class and adds unit tests for it. Change-Id: I2d3acb6f68b16ca0af06dad0098d7ec1eff41202 Reviewed-on: http://gerrit.cloudera.org:8080/4116 Reviewed-by: Matthew Jacobs <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f5541d60 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f5541d60 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f5541d60 Branch: refs/heads/master Commit: f5541d604046a9f47d1000c2ddc8f38a732c6456 Parents: 8cab36c Author: Lars Volker <[email protected]> Authored: Fri Aug 19 00:36:19 2016 +0200 Committer: Internal Jenkins <[email protected]> Committed: Thu Aug 25 20:23:04 2016 +0000 ---------------------------------------------------------------------- be/src/scheduling/CMakeLists.txt | 2 + be/src/scheduling/backend-config-test.cc | 100 ++++++++++++++++++++ be/src/scheduling/backend-config.cc | 89 ++++++++++++++++++ be/src/scheduling/backend-config.h | 75 +++++++++++++++ be/src/scheduling/simple-scheduler.cc | 130 +++----------------------- be/src/scheduling/simple-scheduler.h | 65 ++----------- be/src/util/network-util.cc | 38 +++++++- be/src/util/network-util.h | 20 +++- 8 files changed, 337 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt index 5ce7ff9..c5b4eb4 100644 --- a/be/src/scheduling/CMakeLists.txt +++ b/be/src/scheduling/CMakeLists.txt @@ -25,6 +25,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/scheduling") # TODO: Move other scheduling-related classes here add_library(Scheduling STATIC admission-controller.cc + backend-config.cc query-resource-mgr.cc query-schedule.cc request-pool-service.cc @@ -33,5 +34,6 @@ add_library(Scheduling STATIC add_dependencies(Scheduling thrift-deps) ADD_BE_TEST(simple-scheduler-test) +ADD_BE_TEST(backend-config-test) # TODO: Add BE test # ADD_BE_TEST(admission-controller-test) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/backend-config-test.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/backend-config-test.cc b/be/src/scheduling/backend-config-test.cc new file mode 100644 index 0000000..82dc6a5 --- /dev/null +++ b/be/src/scheduling/backend-config-test.cc @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include "scheduling/backend-config.h" + +#include "common/logging.h" +#include "common/names.h" +#include "util/network-util.h" +#include "util/thread.h" + +namespace impala { + +/// Test that BackendConfig can be created from a vector of backends. +TEST(BackendConfigTest, MakeFromBackendVector) { + // This address needs to be resolvable using getaddrinfo(). + vector<TNetworkAddress> backends {MakeNetworkAddress("localhost", 1001)}; + BackendConfig backend_config(backends); + IpAddr backend_ip; + bool ret = backend_config.LookUpBackendIp(backends[0].hostname, &backend_ip); + ASSERT_TRUE(ret); + EXPECT_EQ("127.0.0.1", backend_ip); +} + +/// Test adding multiple backends on different hosts. +TEST(BackendConfigTest, AddBackends) { + BackendConfig backend_config; + backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001)); + backend_config.AddBackend(MakeBackendDescriptor("host_2", "10.0.0.2", 1002)); + ASSERT_EQ(2, backend_config.NumBackends()); + IpAddr backend_ip; + ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip)); + EXPECT_EQ("10.0.0.1", backend_ip); + ASSERT_TRUE(backend_config.LookUpBackendIp("host_2", &backend_ip)); + EXPECT_EQ("10.0.0.2", backend_ip); +} + +/// Test adding multiple backends on the same host. +TEST(BackendConfigTest, MultipleBackendsOnSameHost) { + BackendConfig backend_config; + backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001)); + backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1002)); + IpAddr backend_ip; + ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip)); + EXPECT_EQ("10.0.0.1", backend_ip); + const BackendConfig::BackendList& backend_list = + backend_config.GetBackendListForHost("10.0.0.1"); + EXPECT_EQ(2, backend_list.size()); +} + +/// Test removing a backend. +TEST(BackendConfigTest, RemoveBackend) { + BackendConfig backend_config; + backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001)); + backend_config.AddBackend(MakeBackendDescriptor("host_2", "10.0.0.2", 1002)); + backend_config.RemoveBackend(MakeBackendDescriptor("host_2", "10.0.0.2", 1002)); + IpAddr backend_ip; + ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip)); + EXPECT_EQ("10.0.0.1", backend_ip); + ASSERT_FALSE(backend_config.LookUpBackendIp("host_2", &backend_ip)); +} + +/// Test removing one of multiple backends on the same host (IMPALA-3944). +TEST(BackendConfigTest, RemoveBackendOnSameHost) { + BackendConfig backend_config; + backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001)); + backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1002)); + backend_config.RemoveBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1002)); + IpAddr backend_ip; + ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip)); + EXPECT_EQ("10.0.0.1", backend_ip); + const BackendConfig::BackendList& backend_list = + backend_config.GetBackendListForHost("10.0.0.1"); + EXPECT_EQ(1, backend_list.size()); +} + +} // end namespace impala + +int main(int argc, char **argv) { + google::InitGoogleLogging(argv[0]); + impala::CpuInfo::Init(); + impala::InitThreading(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/backend-config.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/backend-config.cc b/be/src/scheduling/backend-config.cc new file mode 100644 index 0000000..e5c6824 --- /dev/null +++ b/be/src/scheduling/backend-config.cc @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "scheduling/backend-config.h" + +namespace impala{ + +BackendConfig::BackendConfig(const std::vector<TNetworkAddress>& backends) { + // Construct backend_map and backend_ip_map. + for (const TNetworkAddress& backend: backends) { + IpAddr ip; + Status status = HostnameToIpAddr(backend.hostname, &ip); + if (!status.ok()) { + VLOG(1) << status.GetDetail(); + continue; + } + AddBackend(MakeBackendDescriptor(backend.hostname, ip, backend.port)); + } +} + +const BackendConfig::BackendList& BackendConfig::GetBackendListForHost( + const IpAddr& ip) const { + BackendMap::const_iterator it = backend_map_.find(ip); + DCHECK(it != backend_map_.end()); + return it->second; +} + +void BackendConfig::GetAllBackendIps(std::vector<IpAddr>* ip_addresses) const { + ip_addresses->reserve(NumBackends()); + for (auto& it: backend_map_) ip_addresses->push_back(it.first); +} + +void BackendConfig::GetAllBackends(BackendList* backends) const { + for (const auto& backend_list: backend_map_) { + backends->insert(backends->end(), backend_list.second.begin(), + backend_list.second.end()); + } +} + +void BackendConfig::AddBackend(const TBackendDescriptor& be_desc) { + DCHECK(!be_desc.ip_address.empty()); + BackendList& be_descs = backend_map_[be_desc.ip_address]; + if (find(be_descs.begin(), be_descs.end(), be_desc) == be_descs.end()) { + be_descs.push_back(be_desc); + } + backend_ip_map_[be_desc.address.hostname] = be_desc.ip_address; +} + +void BackendConfig::RemoveBackend(const TBackendDescriptor& be_desc) { + auto be_descs_it = backend_map_.find(be_desc.ip_address); + if (be_descs_it != backend_map_.end()) { + BackendList* be_descs = &be_descs_it->second; + be_descs->erase(remove(be_descs->begin(), be_descs->end(), be_desc), be_descs->end()); + if (be_descs->empty()) { + backend_map_.erase(be_descs_it); + backend_ip_map_.erase(be_desc.address.hostname); + } + } +} + +bool BackendConfig::LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const { + // Check if hostname is already a valid IP address. + if (backend_map_.find(hostname) != backend_map_.end()) { + if (ip) *ip = hostname; + return true; + } + auto it = backend_ip_map_.find(hostname); + if (it != backend_ip_map_.end()) { + if (ip) *ip = it->second; + return true; + } + return false; +} + +} // end ns impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/backend-config.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/backend-config.h b/be/src/scheduling/backend-config.h new file mode 100644 index 0000000..25f8292 --- /dev/null +++ b/be/src/scheduling/backend-config.h @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef SCHEDULING_BACKEND_CONFIG_H +#define SCHEDULING_BACKEND_CONFIG_H + +#include <vector> + +#include <boost/unordered_map.hpp> + +#include "gen-cpp/StatestoreService_types.h" +#include "gen-cpp/Types_types.h" +#include "util/network-util.h" + +namespace impala { + +/// Configuration class to store a list of backends per IP address and a mapping from +/// hostnames to IP addresses. +class BackendConfig { + public: + BackendConfig() {} + + /// Construct from list of backends. + BackendConfig(const std::vector<TNetworkAddress>& backends); + + /// List of Backends. + typedef std::list<TBackendDescriptor> BackendList; + + /// Return the list of backends on a particular host. The caller must make sure that the + /// host is actually contained in backend_map_. + const BackendList& GetBackendListForHost(const IpAddr& ip) const; + + void GetAllBackendIps(std::vector<IpAddr>* ip_addresses) const; + void GetAllBackends(BackendList* backends) const; + void AddBackend(const TBackendDescriptor& be_desc); + void RemoveBackend(const TBackendDescriptor& be_desc); + + /// Look up the IP address of 'hostname' in the internal backend maps and return + /// whether the lookup was successful. If 'hostname' itself is a valid IP address and + /// is contained in backend_map_, then it is copied to 'ip' and true is returned. 'ip' + /// can be NULL if the caller only wants to check whether the lookup succeeds. Use this + /// method to resolve datanode hostnames to IP addresses during scheduling, to prevent + /// blocking on the OS. + bool LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const; + + int NumBackends() const { return backend_map_.size(); } + + private: + /// Map from a host's IP address to a list of backends running on that node. + typedef boost::unordered_map<IpAddr, BackendList> BackendMap; + BackendMap backend_map_; + + /// Map from a hostname to its IP address to support hostname based backend lookup. It + /// contains entries for all backends in backend_map_ and needs to be updated whenever + /// backend_map_ changes. + typedef boost::unordered_map<Hostname, IpAddr> BackendIpAddressMap; + BackendIpAddressMap backend_ip_map_; +}; + +} // end ns impala +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/simple-scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc index 5160393..9a865f0 100644 --- a/be/src/scheduling/simple-scheduler.cc +++ b/be/src/scheduling/simple-scheduler.cc @@ -185,7 +185,7 @@ Status SimpleScheduler::Init() { if (metrics_ != NULL) { // This is after registering with the statestored, so we already have to synchronize // access to the backend_config_ shared_ptr. - int num_backends = GetBackendConfig()->backend_map().size(); + int num_backends = GetBackendConfig()->NumBackends(); total_assignments_ = metrics_->AddCounter<int64_t>(ASSIGNMENTS_KEY, 0); total_local_assignments_ = metrics_->AddCounter<int64_t>(LOCAL_ASSIGNMENTS_KEY, 0); initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true); @@ -210,10 +210,11 @@ Status SimpleScheduler::Init() { void SimpleScheduler::BackendsUrlCallback(const Webserver::ArgumentMap& args, Document* document) { - BackendList backends; - GetAllKnownBackends(&backends); + BackendConfig::BackendList backends; + BackendConfigPtr backend_config = GetBackendConfig(); + backend_config->GetAllBackends(&backends); Value backends_list(kArrayType); - for (const BackendList::value_type& backend: backends) { + for (const TBackendDescriptor& backend: backends) { Value str(TNetworkAddressToString(backend.address).c_str(), document->GetAllocator()); backends_list.PushBack(str, document->GetAllocator()); } @@ -334,16 +335,6 @@ void SimpleScheduler::SetBackendConfig(const BackendConfigPtr& backend_config) backend_config_ = backend_config; } - -void SimpleScheduler::GetAllKnownBackends(BackendList* backends) { - backends->clear(); - BackendConfigPtr backend_config = GetBackendConfig(); - for (const BackendMap::value_type& backend_list: backend_config->backend_map()) { - backends->insert(backends->end(), backend_list.second.begin(), - backend_list.second.end()); - } -} - Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec_request, QuerySchedule* schedule) { map<TPlanNodeId, vector<TScanRangeLocations>>::const_iterator entry; @@ -871,109 +862,21 @@ void SimpleScheduler::HandleLostResource(const TUniqueId& client_resource_id) { } } -Status SimpleScheduler::HostnameToIpAddr(const Hostname& hostname, IpAddr* ip) { - // Try to resolve via the operating system. - vector<IpAddr> ipaddrs; - Status status = HostnameToIpAddrs(hostname, &ipaddrs); - if (!status.ok() || ipaddrs.empty()) { - stringstream ss; - ss << "Failed to resolve " << hostname << ": " << status.GetDetail(); - return Status(ss.str()); - } - - // HostnameToIpAddrs() calls getaddrinfo() from glibc and will preserve the order of the - // result. RFC 3484 only specifies a partial order so we need to sort the addresses - // before picking the first non-localhost one. - sort(ipaddrs.begin(), ipaddrs.end()); - - // Try to find a non-localhost address, otherwise just use the first IP address - // returned. - *ip = ipaddrs[0]; - if (!FindFirstNonLocalhost(ipaddrs, ip)) { - VLOG(3) << "Only localhost addresses found for " << hostname; - } - return Status::OK(); -} - -SimpleScheduler::BackendConfig::BackendConfig( - const std::vector<TNetworkAddress>& backends) { - // Construct backend_map and backend_ip_map. - for (int i = 0; i < backends.size(); ++i) { - IpAddr ip; - Status status = HostnameToIpAddr(backends[i].hostname, &ip); - if (!status.ok()) { - VLOG(1) << status.GetDetail(); - continue; - } - - BackendMap::iterator it = backend_map_.find(ip); - if (it == backend_map_.end()) { - it = backend_map_.insert( - make_pair(ip, BackendList())).first; - backend_ip_map_[backends[i].hostname] = ip; - } - - TBackendDescriptor descriptor; - descriptor.address = MakeNetworkAddress(ip, backends[i].port); - descriptor.ip_address = ip; - it->second.push_back(descriptor); - } -} - -void SimpleScheduler::BackendConfig::AddBackend(const TBackendDescriptor& be_desc) { - DCHECK(!be_desc.ip_address.empty()); - BackendList* be_descs = &backend_map_[be_desc.ip_address]; - if (find(be_descs->begin(), be_descs->end(), be_desc) == be_descs->end()) { - be_descs->push_back(be_desc); - } - backend_ip_map_[be_desc.address.hostname] = be_desc.ip_address; -} - -void SimpleScheduler::BackendConfig::RemoveBackend(const TBackendDescriptor& be_desc) { - auto be_descs_it = backend_map_.find(be_desc.ip_address); - if (be_descs_it != backend_map_.end()) { - BackendList* be_descs = &be_descs_it->second; - be_descs->erase(remove(be_descs->begin(), be_descs->end(), be_desc), be_descs->end()); - if (be_descs->empty()) { - backend_map_.erase(be_descs_it); - backend_ip_map_.erase(be_desc.address.hostname); - } - } -} - -bool SimpleScheduler::BackendConfig::LookUpBackendIp(const Hostname& hostname, - IpAddr* ip) const { - // Check if hostname is already a valid IP address. - if (backend_map_.find(hostname) != backend_map_.end()) { - if (ip) *ip = hostname; - return true; - } - auto it = backend_ip_map_.find(hostname); - if (it != backend_ip_map_.end()) { - if (ip) *ip = it->second; - return true; - } - return false; -} - SimpleScheduler::AssignmentCtx::AssignmentCtx( const BackendConfig& backend_config, IntCounter* total_assignments, IntCounter* total_local_assignments) : backend_config_(backend_config), first_unused_backend_idx_(0), total_assignments_(total_assignments), total_local_assignments_(total_local_assignments) { - random_backend_order_.reserve(backend_map().size()); - for (auto& v: backend_map()) random_backend_order_.push_back(&v); + backend_config.GetAllBackendIps(&random_backend_order_); std::mt19937 g(rand()); std::shuffle(random_backend_order_.begin(), random_backend_order_.end(), g); // Initialize inverted map for backend rank lookups int i = 0; - for (const BackendMap::value_type* v: random_backend_order_) { - random_backend_rank_[v->first] = i++; - } + for (const IpAddr& ip: random_backend_order_) random_backend_rank_[ip] = i++; } -const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectLocalBackendHost( +const IpAddr* SimpleScheduler::AssignmentCtx::SelectLocalBackendHost( const std::vector<IpAddr>& data_locations, bool break_ties_by_rank) { DCHECK(!data_locations.empty()); // List of candidate indexes into 'data_locations'. @@ -1005,7 +908,7 @@ const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectLocalBacken return &data_locations[*min_rank_idx]; } -const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectRemoteBackendHost() { +const IpAddr* SimpleScheduler::AssignmentCtx::SelectRemoteBackendHost() { const IpAddr* candidate_ip; if (HasUnusedBackends()) { // Pick next unused backend. @@ -1024,11 +927,9 @@ bool SimpleScheduler::AssignmentCtx::HasUnusedBackends() const { return first_unused_backend_idx_ < random_backend_order_.size(); } -const SimpleScheduler::IpAddr* - SimpleScheduler::AssignmentCtx::GetNextUnusedBackendAndIncrement() { +const IpAddr* SimpleScheduler::AssignmentCtx::GetNextUnusedBackendAndIncrement() { DCHECK(HasUnusedBackends()); - const IpAddr* ip = &(random_backend_order_[first_unused_backend_idx_++])->first; - DCHECK(backend_map().find(*ip) != backend_map().end()); + const IpAddr* ip = &random_backend_order_[first_unused_backend_idx_++]; return ip; } @@ -1040,14 +941,14 @@ int SimpleScheduler::AssignmentCtx::GetBackendRank(const IpAddr& ip) const { void SimpleScheduler::AssignmentCtx::SelectBackendOnHost(const IpAddr& backend_ip, TBackendDescriptor* backend) { - BackendMap::const_iterator backend_it = backend_map().find(backend_ip); - DCHECK(backend_it != backend_map().end()); - const BackendList& backends_on_host = backend_it->second; + DCHECK(backend_config_.LookUpBackendIp(backend_ip, NULL)); + const BackendConfig::BackendList& backends_on_host = + backend_config_.GetBackendListForHost(backend_ip); DCHECK(backends_on_host.size() > 0); if (backends_on_host.size() == 1) { *backend = *backends_on_host.begin(); } else { - BackendList::const_iterator* next_backend_on_host; + BackendConfig::BackendList::const_iterator* next_backend_on_host; next_backend_on_host = FindOrInsert(&next_backend_per_host_, backend_ip, backends_on_host.begin()); DCHECK(find(backends_on_host.begin(), backends_on_host.end(), **next_backend_on_host) @@ -1078,7 +979,6 @@ void SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment( IpAddr backend_ip; backend_config_.LookUpBackendIp(backend.address.hostname, &backend_ip); DCHECK(!backend_ip.empty()); - DCHECK(backend_map().find(backend_ip) != backend_map().end()); assignment_heap_.InsertOrUpdate(backend_ip, scan_range_length, GetBackendRank(backend_ip)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/simple-scheduler.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h index a6ab93e..dd119c2 100644 --- a/be/src/scheduling/simple-scheduler.h +++ b/be/src/scheduling/simple-scheduler.h @@ -34,6 +34,7 @@ #include "util/metrics.h" #include "util/runtime-profile.h" #include "scheduling/admission-controller.h" +#include "scheduling/backend-config.h" #include "gen-cpp/Types_types.h" // for TNetworkAddress #include "gen-cpp/ResourceBrokerService_types.h" #include "rapidjson/rapidjson.h" @@ -95,55 +96,10 @@ class SimpleScheduler : public Scheduler { virtual void HandleLostResource(const TUniqueId& client_resource_id); private: - /// Type to store hostnames, which can be rfc1123 hostnames or IPv4 addresses. - typedef std::string Hostname; - - /// Type to store IPv4 addresses. - typedef std::string IpAddr; - - typedef std::list<TBackendDescriptor> BackendList; - - /// Map from a host's IP address to a list of backends running on that node. - typedef boost::unordered_map<IpAddr, BackendList> BackendMap; - /// Map from a host's IP address to the next backend to be round-robin scheduled for /// that host (needed for setups with multiple backends on a single host) - typedef boost::unordered_map<IpAddr, BackendList::const_iterator> NextBackendPerHost; - - /// Map from a hostname to its IP address to support hostname based backend lookup. - typedef boost::unordered_map<Hostname, IpAddr> BackendIpAddressMap; - - /// Configuration class to store a list of backends per IP address and a mapping from - /// hostnames to IP addresses. backend_ip_map contains entries for all backends in - /// backend_map and needs to be updated whenever backend_map changes. Each plan node - /// creates a read-only copy of the scheduler's current backend_config_ to use during - /// scheduling. - class BackendConfig { - public: - BackendConfig() {} - - /// Construct config from list of backends. - BackendConfig(const std::vector<TNetworkAddress>& backends); - - void AddBackend(const TBackendDescriptor& be_desc); - void RemoveBackend(const TBackendDescriptor& be_desc); - - /// Look up the IP address of 'hostname' in the internal backend maps and return - /// whether the lookup was successful. If 'hostname' itself is a valid IP address then - /// it is copied to 'ip' and true is returned. 'ip' can be NULL if the caller only - /// wants to check whether the lookup succeeds. Use this method to resolve datanode - /// hostnames to IP addresses during scheduling, to prevent blocking on the OS. - bool LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const; - - int NumBackends() const { return backend_map().size(); } - - const BackendMap& backend_map() const { return backend_map_; } - const BackendIpAddressMap& backend_ip_map() const { return backend_ip_map_; } - - private: - BackendMap backend_map_; - BackendIpAddressMap backend_ip_map_; - }; + typedef boost::unordered_map<IpAddr, BackendConfig::BackendList::const_iterator> + NextBackendPerHost; typedef std::shared_ptr<const BackendConfig> BackendConfigPtr; @@ -250,7 +206,6 @@ class SimpleScheduler : public Scheduler { FragmentScanRangeAssignment* assignment); const BackendConfig& backend_config() const { return backend_config_; } - const BackendMap& backend_map() const { return backend_config_.backend_map(); } /// Print the assignment and statistics to VLOG_FILE. void PrintAssignment(const FragmentScanRangeAssignment& assignment); @@ -279,7 +234,7 @@ class SimpleScheduler : public Scheduler { int first_unused_backend_idx_; /// Store a random permutation of backend hosts to select backends from. - std::vector<const BackendMap::value_type*> random_backend_order_; + std::vector<IpAddr> random_backend_order_; /// Track round robin information per backend host. NextBackendPerHost next_backend_per_host_; @@ -301,7 +256,9 @@ class SimpleScheduler : public Scheduler { /// The scheduler's backend configuration. When receiving changes to the backend /// configuration from the statestore we will make a copy of the stored object, apply - /// the updates to the copy and atomically swap the contents of this pointer. + /// the updates to the copy and atomically swap the contents of this pointer. Each plan + /// node creates a read-only copy of the scheduler's current backend_config_ to use + /// during scheduling. BackendConfigPtr backend_config_; /// Protect access to backend_config_ which might otherwise be updated asynchronously @@ -382,9 +339,6 @@ class SimpleScheduler : public Scheduler { BackendConfigPtr GetBackendConfig() const; void SetBackendConfig(const BackendConfigPtr& backend_config); - /// Return a list of all backends registered with the scheduler. - void GetAllKnownBackends(BackendList* backends); - /// Add the granted reservation and resources to the active_reservations_ and /// active_client_resources_ maps, respectively. void AddToActiveResourceMaps( @@ -517,11 +471,6 @@ class SimpleScheduler : public Scheduler { int FindSenderFragment(TPlanNodeId exch_id, int fragment_idx, const TQueryExecRequest& exec_request); - /// Deterministically resolve a host to one of its IP addresses. This method will call - /// into the OS, so it can take a long time to return. Use this method to resolve - /// hostnames during initialization and while processing statestore updates. - static Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip); - friend class impala::SchedulerWrapper; FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentDeterministicNonCached); FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomNonCached); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/util/network-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc index bce350d..afa5b32 100644 --- a/be/src/util/network-util.cc +++ b/be/src/util/network-util.cc @@ -59,16 +59,18 @@ Status GetHostname(string* hostname) { return Status::OK(); } -Status HostnameToIpAddrs(const string& name, vector<string>* addresses) { +Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip){ + // Try to resolve via the operating system. + vector<IpAddr> addresses; addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = AF_INET; // IPv4 addresses only hints.ai_socktype = SOCK_STREAM; struct addrinfo* addr_info; - if (getaddrinfo(name.c_str(), NULL, &hints, &addr_info) != 0) { + if (getaddrinfo(hostname.c_str(), NULL, &hints, &addr_info) != 0) { stringstream ss; - ss << "Could not find IPv4 address for: " << name; + ss << "Could not find IPv4 address for: " << hostname; return Status(ss.str()); } @@ -79,15 +81,32 @@ Status HostnameToIpAddrs(const string& name, vector<string>* addresses) { inet_ntop(AF_INET, &((sockaddr_in*)it->ai_addr)->sin_addr, addr_buf, 64); if (result == NULL) { stringstream ss; - ss << "Could not convert IPv4 address for: " << name; + ss << "Could not convert IPv4 address for: " << hostname; freeaddrinfo(addr_info); return Status(ss.str()); } - addresses->push_back(string(addr_buf)); + addresses.push_back(string(addr_buf)); it = it->ai_next; } freeaddrinfo(addr_info); + + if (addresses.empty()) { + stringstream ss; + ss << "Could not convert IPv4 address for: " << hostname; + return Status(ss.str()); + } + + // RFC 3484 only specifies a partial order for the result of getaddrinfo() so we need to + // sort the addresses before picking the first non-localhost one. + sort(addresses.begin(), addresses.end()); + + // Try to find a non-localhost address, otherwise just use the first IP address + // returned. + *ip = addresses[0]; + if (!FindFirstNonLocalhost(addresses, ip)) { + VLOG(3) << "Only localhost addresses found for " << hostname; + } return Status::OK(); } @@ -128,6 +147,15 @@ TNetworkAddress MakeNetworkAddress(const string& address) { return ret; } +/// Utility method because Thrift does not supply useful constructors +TBackendDescriptor MakeBackendDescriptor(const Hostname& hostname, const IpAddr& ip, + int port) { + TBackendDescriptor be_desc; + be_desc.address = MakeNetworkAddress(hostname, port); + be_desc.ip_address = ip; + return be_desc; +} + bool IsWildcardAddress(const string& ipaddress) { return ipaddress == "0.0.0.0"; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/util/network-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h index 57e95a1..315d451 100644 --- a/be/src/util/network-util.h +++ b/be/src/util/network-util.h @@ -16,15 +16,23 @@ // under the License. #include "common/status.h" +#include "gen-cpp/StatestoreService_types.h" #include "gen-cpp/Types_types.h" #include <vector> namespace impala { -/// Looks up all IP addresses associated with a given hostname. Returns -/// an error status if any system call failed, otherwise OK. Even if OK -/// is returned, addresses may still be of zero length. -Status HostnameToIpAddrs(const std::string& name, std::vector<std::string>* addresses); +/// Type to store hostnames, which can be rfc1123 hostnames or IPv4 addresses. +typedef std::string Hostname; + +/// Type to store IPv4 addresses. +typedef std::string IpAddr; + +/// Looks up all IP addresses associated with a given hostname and returns one of them via +/// 'address'. If the IP addresses of a host don't change, then subsequent calls will +/// always return the same address. Returns an error status if any system call failed, +/// otherwise OK. Even if OK is returned, addresses may still be of zero length. +Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip); /// Finds the first non-localhost IP address in the given list. Returns /// true if such an address was found, false otherwise. @@ -43,6 +51,10 @@ TNetworkAddress MakeNetworkAddress(const std::string& hostname, int port); /// hostname and a port of 0. TNetworkAddress MakeNetworkAddress(const std::string& address); +/// Utility method because Thrift does not supply useful constructors +TBackendDescriptor MakeBackendDescriptor(const Hostname& hostname, const IpAddr& ip, + int port); + /// Returns true if the ip address parameter is the wildcard interface (0.0.0.0) bool IsWildcardAddress(const std::string& ipaddress);
