IMPALA-4856, IMPALA-4872: Include KRPC services in plan fragment's destinations
This change allows Impala to publish the IP address and port information of KRPC services if it's enabled via the flag use_krpc. The information is included in a new field in the backend descriptor published as statestore updates. Scheduler will also include this information in the destinations of plan fragments. Also updated the mini-cluster startup script to assign KRPC ports to Impalad instances. This patch also takes into account of a problem found in IMPALA-5795. In particular, the backend descriptor of the coordinator may not be found in the backend map in the scheduler if coordinator is not an executor (i.e. dedicated coordinator). The fix to also check against the local backend descriptor. This patch is partially based on an abandoned patch by Henry Robinson. Testing done: ran core tests with a patch which ignores the use_krpc flag to exercise the code in scheduler. Change-Id: I8707bfb5028bbe81d2a042fcf3e6e19f4b719a72 Reviewed-on: http://gerrit.cloudera.org:8080/7760 Reviewed-by: Michael Ho <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7b6ad283 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7b6ad283 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7b6ad283 Branch: refs/heads/master Commit: 7b6ad283f46d31aa953d4d44e142b0bc6b0afbb4 Parents: ec2ba02 Author: Michael Ho <[email protected]> Authored: Fri Aug 18 20:29:37 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Aug 31 02:09:54 2017 +0000 ---------------------------------------------------------------------- be/src/common/global-flags.cc | 5 +- be/src/runtime/exec-env.cc | 22 +++++---- be/src/runtime/exec-env.h | 10 +++- be/src/scheduling/backend-config.cc | 16 ++++++- be/src/scheduling/backend-config.h | 11 +++-- be/src/scheduling/scheduler-test-util.cc | 7 ++- be/src/scheduling/scheduler.cc | 63 +++++++++++++++++-------- be/src/scheduling/scheduler.h | 34 ++++++++----- be/src/service/impala-server.cc | 10 ++-- be/src/testutil/in-process-servers.cc | 13 +++-- be/src/testutil/in-process-servers.h | 2 +- be/src/util/network-util.cc | 6 +++ be/src/util/network-util.h | 3 ++ bin/start-impala-cluster.py | 4 +- common/thrift/ImpalaInternalService.thrift | 7 ++- common/thrift/StatestoreService.thrift | 5 +- 16 files changed, 158 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/common/global-flags.cc ---------------------------------------------------------------------- diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc index 9404148..ccffcb3 100644 --- a/be/src/common/global-flags.cc +++ b/be/src/common/global-flags.cc @@ -35,7 +35,10 @@ DEFINE_string(hostname, "", "Hostname to use for this daemon, also used as part "the Kerberos principal, if enabled. If not set, the system default will be" " used"); -DEFINE_int32(be_port, 22000, "port on which ImpalaInternalService is exported"); +DEFINE_int32(be_port, 22000, + "port on which thrift based ImpalaInternalService is exported"); +DEFINE_int32_hidden(krpc_port, 29000, + "port on which KRPC based ImpalaInternalService is exported"); // Kerberos is enabled if and only if principal is set. DEFINE_string(principal, "", "Kerberos principal. If set, both client and backend network" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 02d07e8..0d7518a 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -76,7 +76,7 @@ DEFINE_int32(state_store_subscriber_port, 23000, DEFINE_int32(num_hdfs_worker_threads, 16, "(Advanced) The number of threads in the global HDFS operation pool"); DEFINE_bool(disable_admission_control, false, "Disables admission control."); -DEFINE_bool_hidden(use_krpc, false, "Used to indicate whether to use Kudu RPC for the " +DEFINE_bool_hidden(use_krpc, false, "Used to indicate whether to use KRPC for the " "DataStream subsystem, or the Thrift RPC layer instead. Defaults to false. " "KRPC not yet supported"); @@ -84,6 +84,7 @@ DECLARE_int32(state_store_port); DECLARE_int32(num_threads_per_core); DECLARE_int32(num_cores); DECLARE_int32(be_port); +DECLARE_int32(krpc_port); DECLARE_string(mem_limit); DECLARE_string(buffer_pool_limit); DECLARE_string(buffer_pool_clean_pages_limit); @@ -136,11 +137,13 @@ struct ExecEnv::KuduClientPtr { ExecEnv* ExecEnv::exec_env_ = nullptr; ExecEnv::ExecEnv() - : ExecEnv(FLAGS_hostname, FLAGS_be_port, FLAGS_state_store_subscriber_port, - FLAGS_webserver_port, FLAGS_state_store_host, FLAGS_state_store_port) {} + : ExecEnv(FLAGS_hostname, FLAGS_be_port, FLAGS_krpc_port, + FLAGS_state_store_subscriber_port, FLAGS_webserver_port, + FLAGS_state_store_host, FLAGS_state_store_port) {} -ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, - int webserver_port, const string& statestore_host, int statestore_port) +ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port, + int subscriber_port, int webserver_port, const string& statestore_host, + int statestore_port) : obj_pool_(new ObjectPool), metrics_(new MetricGroup("impala-metrics")), impalad_client_cache_( @@ -165,7 +168,8 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)), query_exec_mgr_(new QueryExecMgr()), enable_webserver_(FLAGS_enable_webserver && webserver_port > 0), - backend_address_(MakeNetworkAddress(hostname, backend_port)) { + backend_address_(MakeNetworkAddress(hostname, backend_port)), + krpc_port_(krpc_port) { if (FLAGS_use_krpc) { stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get())); @@ -185,7 +189,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, if (FLAGS_is_coordinator) { scheduler_.reset(new Scheduler(statestore_subscriber_.get(), - statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(), + statestore_subscriber_->id(), metrics_.get(), webserver_.get(), request_pool_service_.get())); } @@ -332,7 +336,9 @@ Status ExecEnv::StartServices() { LOG(INFO) << "Not starting webserver"; } - if (scheduler_ != nullptr) RETURN_IF_ERROR(scheduler_->Init()); + if (scheduler_ != nullptr) { + RETURN_IF_ERROR(scheduler_->Init(backend_address_, krpc_port_)); + } if (admission_controller_ != nullptr) RETURN_IF_ERROR(admission_controller_->Init()); // Get the fs.defaultFS value set in core-site.xml and assign it to http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/runtime/exec-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index 9e925a1..25d1055 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -69,8 +69,9 @@ class ExecEnv { public: ExecEnv(); - ExecEnv(const std::string& hostname, int backend_port, int subscriber_port, - int webserver_port, const std::string& statestore_host, int statestore_port); + ExecEnv(const std::string& hostname, int backend_port, int krpc_port, + int subscriber_port, int webserver_port, const std::string& statestore_host, + int statestore_port); /// Returns the first created exec env instance. In a normal impalad, this is /// the only instance. In test setups with multiple ExecEnv's per process, @@ -126,6 +127,8 @@ class ExecEnv { const TNetworkAddress& backend_address() const { return backend_address_; } + int krpc_port() const { return krpc_port_; } + /// Initializes the exec env for running FE tests. Status InitForFeTests() WARN_UNUSED_RESULT; @@ -187,6 +190,9 @@ class ExecEnv { /// Address of the Impala backend server instance TNetworkAddress backend_address_; + /// Port number on which all KRPC-based services are exported. + int krpc_port_; + /// fs.defaultFs value set in core-site.xml std::string default_fs_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/scheduling/backend-config.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/backend-config.cc b/be/src/scheduling/backend-config.cc index e5c6824..715ec34 100644 --- a/be/src/scheduling/backend-config.cc +++ b/be/src/scheduling/backend-config.cc @@ -75,15 +75,27 @@ void BackendConfig::RemoveBackend(const TBackendDescriptor& be_desc) { bool BackendConfig::LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const { // Check if hostname is already a valid IP address. if (backend_map_.find(hostname) != backend_map_.end()) { - if (ip) *ip = hostname; + if (ip != nullptr) *ip = hostname; return true; } auto it = backend_ip_map_.find(hostname); if (it != backend_ip_map_.end()) { - if (ip) *ip = it->second; + if (ip != nullptr) *ip = it->second; return true; } return false; } +const TBackendDescriptor* BackendConfig::LookUpBackendDesc( + const TNetworkAddress& host) const { + IpAddr ip; + if (LIKELY(LookUpBackendIp(host.hostname, &ip))) { + const BackendConfig::BackendList& be_list = GetBackendListForHost(ip); + for (const TBackendDescriptor& desc : be_list) { + if (desc.address == host) return &desc; + } + } + return nullptr; +} + } // end ns impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/scheduling/backend-config.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/backend-config.h b/be/src/scheduling/backend-config.h index 25f8292..c7cbbcd 100644 --- a/be/src/scheduling/backend-config.h +++ b/be/src/scheduling/backend-config.h @@ -52,11 +52,16 @@ class BackendConfig { /// Look up the IP address of 'hostname' in the internal backend maps and return /// whether the lookup was successful. If 'hostname' itself is a valid IP address and /// is contained in backend_map_, then it is copied to 'ip' and true is returned. 'ip' - /// can be NULL if the caller only wants to check whether the lookup succeeds. Use this - /// method to resolve datanode hostnames to IP addresses during scheduling, to prevent - /// blocking on the OS. + /// can be nullptr if the caller only wants to check whether the lookup succeeds. Use + /// this method to resolve datanode hostnames to IP addresses during scheduling, to + /// prevent blocking on the OS. bool LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const; + /// Look up the backend descriptor for the backend with hostname 'host'. + /// Returns nullptr if it's not found. The returned descriptor should not + /// be retained beyond the lifetime of this BackendConfig. + const TBackendDescriptor* LookUpBackendDesc(const TNetworkAddress& host) const; + int NumBackends() const { return backend_map_.size(); } private: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/scheduling/scheduler-test-util.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc index b9b2c6e..841bc24 100644 --- a/be/src/scheduling/scheduler-test-util.cc +++ b/be/src/scheduling/scheduler-test-util.cc @@ -20,11 +20,14 @@ #include <boost/unordered_set.hpp> #include "common/names.h" +#include "runtime/exec-env.h" #include "scheduling/scheduler.h" using namespace impala; using namespace impala::test; +DECLARE_int32(krpc_port); + /// Sample 'n' elements without replacement from the set [0..N-1]. /// This is an implementation of "Algorithm R" by J. Vitter. void SampleN(int n, int N, vector<int>* out) { @@ -507,9 +510,9 @@ void SchedulerWrapper::InitializeScheduler() { scheduler_backend_address.hostname = scheduler_host.ip; scheduler_backend_address.port = scheduler_host.be_port; - scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id, scheduler_backend_address, + scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id, &metrics_, nullptr, nullptr)); - const Status status = scheduler_->Init(); + const Status status = scheduler_->Init(scheduler_backend_address, FLAGS_krpc_port); DCHECK(status.ok()) << "Scheduler init failed in test"; // Initialize the scheduler backend maps. SendFullMembershipMap(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/scheduling/scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index 1c39230..f6e323f 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -43,8 +43,7 @@ using boost::algorithm::join; using namespace apache::thrift; using namespace strings; -DECLARE_int32(be_port); -DECLARE_string(hostname); +DECLARE_bool(use_krpc); namespace impala { @@ -56,28 +55,25 @@ static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends"); const string Scheduler::IMPALA_MEMBERSHIP_TOPIC("impala-membership"); Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id, - const TNetworkAddress& backend_address, MetricGroup* metrics, Webserver* webserver, - RequestPoolService* request_pool_service) + MetricGroup* metrics, Webserver* webserver, RequestPoolService* request_pool_service) : executors_config_(std::make_shared<const BackendConfig>()), metrics_(metrics->GetOrCreateChildGroup("scheduler")), webserver_(webserver), statestore_subscriber_(subscriber), local_backend_id_(backend_id), thrift_serializer_(false), - total_assignments_(nullptr), - total_local_assignments_(nullptr), - initialized_(nullptr), request_pool_service_(request_pool_service) { - local_backend_descriptor_.address = backend_address; } -Status Scheduler::Init() { +Status Scheduler::Init(const TNetworkAddress& backend_address, int krpc_port) { LOG(INFO) << "Starting scheduler"; - // Figure out what our IP address is, so that each subscriber - // doesn't have to resolve it on every heartbeat. + // Figure out what our IP address is, so that each subscriber doesn't have to resolve + // it on every heartbeat. KRPC also assumes that the address is resolved already. + // May as well do it up front to avoid frequent DNS requests. + local_backend_descriptor_.address = backend_address; IpAddr ip; - const Hostname& hostname = local_backend_descriptor_.address.hostname; + const Hostname& hostname = backend_address.hostname; Status status = HostnameToIpAddr(hostname, &ip); if (!status.ok()) { VLOG(1) << status.GetDetail(); @@ -88,6 +84,12 @@ Status Scheduler::Init() { local_backend_descriptor_.ip_address = ip; LOG(INFO) << "Scheduler using " << ip << " as IP address"; + if (FLAGS_use_krpc) { + // KRPC expects address to have been resolved already. + TNetworkAddress krpc_svc_addr = MakeNetworkAddress(ip, krpc_port); + local_backend_descriptor_.__set_krpc_address(krpc_svc_addr); + } + coord_only_backend_config_.AddBackend(local_backend_descriptor_); if (statestore_subscriber_ != nullptr) { @@ -222,10 +224,22 @@ void Scheduler::SetExecutorsConfig(const ExecutorsConfigPtr& executors_config) { executors_config_ = executors_config; } -Status Scheduler::ComputeScanRangeAssignment(QuerySchedule* schedule) { +const TBackendDescriptor& Scheduler::LookUpBackendDesc( + const BackendConfig& executor_config, const TNetworkAddress& host) { + const TBackendDescriptor* desc = executor_config.LookUpBackendDesc(host); + if (desc == nullptr) { + // Local host may not be in executor_config if it's a dedicated coordinator. + DCHECK_EQ(host, local_backend_descriptor_.address); + DCHECK(!local_backend_descriptor_.is_executor); + desc = &local_backend_descriptor_; + } + return *desc; +} + +Status Scheduler::ComputeScanRangeAssignment( + const BackendConfig& executor_config, QuerySchedule* schedule) { RuntimeProfile::Counter* total_assignment_timer = ADD_TIMER(schedule->summary_profile(), "ComputeScanRangeAssignmentTimer"); - ExecutorsConfigPtr executor_config = GetExecutorsConfig(); const TQueryExecRequest& exec_request = schedule->request(); for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) { for (const auto& entry : plan_exec_info.per_node_scan_ranges) { @@ -248,7 +262,7 @@ Status Scheduler::ComputeScanRangeAssignment(QuerySchedule* schedule) { FragmentScanRangeAssignment* assignment = &schedule->GetFragmentExecParams(fragment.idx)->scan_range_assignment; RETURN_IF_ERROR( - ComputeScanRangeAssignment(*executor_config, node_id, node_replica_preference, + ComputeScanRangeAssignment(executor_config, node_id, node_replica_preference, node_random_replica, entry.second, exec_request.host_list, exec_at_coord, schedule->query_options(), total_assignment_timer, assignment)); schedule->IncNumScanRanges(entry.second.size()); @@ -257,7 +271,8 @@ Status Scheduler::ComputeScanRangeAssignment(QuerySchedule* schedule) { return Status::OK(); } -void Scheduler::ComputeFragmentExecParams(QuerySchedule* schedule) { +void Scheduler::ComputeFragmentExecParams( + const BackendConfig& executor_config, QuerySchedule* schedule) { const TQueryExecRequest& exec_request = schedule->request(); // for each plan, compute the FInstanceExecParams for the tree of fragments @@ -282,7 +297,14 @@ void Scheduler::ComputeFragmentExecParams(QuerySchedule* schedule) { for (int i = 0; i < dest_params->instance_exec_params.size(); ++i) { TPlanFragmentDestination& dest = src_params->destinations[i]; dest.__set_fragment_instance_id(dest_params->instance_exec_params[i].instance_id); - dest.__set_server(dest_params->instance_exec_params[i].host); + const TNetworkAddress& host = dest_params->instance_exec_params[i].host; + dest.__set_server(host); + if (FLAGS_use_krpc) { + const TBackendDescriptor& desc = LookUpBackendDesc(executor_config, host); + DCHECK(desc.__isset.krpc_address); + DCHECK(IsResolvedAddress(desc.krpc_address)); + dest.__set_krpc_server(desc.krpc_address); + } } // enumerate senders consecutively; @@ -679,8 +701,11 @@ void Scheduler::GetScanHosts(TPlanNodeId scan_id, const FragmentExecParams& para } Status Scheduler::Schedule(QuerySchedule* schedule) { - RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule)); - ComputeFragmentExecParams(schedule); + // Make a copy of the executor_config upfront to avoid using inconsistent views + // between ComputeScanRangeAssignment() and ComputeFragmentExecParams(). + ExecutorsConfigPtr config_ptr = GetExecutorsConfig(); + RETURN_IF_ERROR(ComputeScanRangeAssignment(*config_ptr, schedule)); + ComputeFragmentExecParams(*config_ptr, schedule); ComputeBackendExecParams(schedule); #ifndef NDEBUG schedule->Validate(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/scheduling/scheduler.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h index 07e0cfc..d1a22f8 100644 --- a/be/src/scheduling/scheduler.h +++ b/be/src/scheduling/scheduler.h @@ -78,15 +78,17 @@ class Scheduler { /// Initialize with a subscription manager that we can register with for updates to the /// set of available backends. /// - backend_id - unique identifier for this Impala backend (usually a host:port) - /// - backend_address - the address that this backend listens on Scheduler(StatestoreSubscriber* subscriber, const std::string& backend_id, - const TNetworkAddress& backend_address, MetricGroup* metrics, Webserver* webserver, + MetricGroup* metrics, Webserver* webserver, RequestPoolService* request_pool_service); - /// Initialises the scheduler, acquiring all resources needed to make scheduling + /// Initializes the scheduler, acquiring all resources needed to make scheduling /// decisions once this method returns. Register with the subscription manager if - /// required. - impala::Status Init(); + /// required. Also initializes the local backend descriptor. Returns error status + /// on failure. 'backend_address' is the address of thrift based ImpalaInternalService + /// of this backend. 'krpc_port' is the port on which KRPC based ImpalaInternalService + /// is exported. + Status Init(const TNetworkAddress& backend_address, int krpc_port); /// Populates given query schedule and assigns fragments to hosts based on scan /// ranges in the query exec request. @@ -300,14 +302,14 @@ class Scheduler { ThriftSerializer thrift_serializer_; /// Locality metrics - IntCounter* total_assignments_; - IntCounter* total_local_assignments_; + IntCounter* total_assignments_ = nullptr; + IntCounter* total_local_assignments_ = nullptr; /// Initialization metric - BooleanProperty* initialized_; + BooleanProperty* initialized_ = nullptr; /// Current number of executors - IntGauge* num_fragment_instances_metric_; + IntGauge* num_fragment_instances_metric_ = nullptr; /// Used for user-to-pool resolution and looking up pool configurations. Not owned by /// us. @@ -318,6 +320,12 @@ class Scheduler { ExecutorsConfigPtr GetExecutorsConfig() const; void SetExecutorsConfig(const ExecutorsConfigPtr& executors_config); + /// Returns the backend descriptor corresponding to 'host' which could be a remote + /// backend or the local host itself. The returned descriptor should not be retained + /// beyond the lifetime of 'executor_config'. + const TBackendDescriptor& LookUpBackendDesc( + const BackendConfig& executor_config, const TNetworkAddress& host); + /// Called asynchronously when an update is received from the subscription manager void UpdateMembership(const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas, std::vector<TTopicDelta>* subscriber_topic_updates); @@ -331,7 +339,9 @@ class Scheduler { /// Unpartitioned fragments are assigned to the coordinator. Populate the schedule's /// fragment_exec_params_ with the resulting scan range assignment. /// We have a benchmark for this method in be/src/benchmarks/scheduler-benchmark.cc. - Status ComputeScanRangeAssignment(QuerySchedule* schedule); + /// 'executor_config' is the executor configuration to use for scheduling. + Status ComputeScanRangeAssignment(const BackendConfig& executor_config, + QuerySchedule* schedule); /// Process the list of scan ranges of a single plan node and compute scan range /// assignments (returned in 'assignment'). The result is a mapping from hosts to their @@ -407,7 +417,9 @@ class Scheduler { /// TQueryExecRequest.plan_exec_info. /// This includes the routing information (destinations, per_exch_num_senders, /// sender_id) - void ComputeFragmentExecParams(QuerySchedule* schedule); + /// 'executor_config' is the executor configuration to use for scheduling. + void ComputeFragmentExecParams(const BackendConfig& executor_config, + QuerySchedule* schedule); /// Recursively create FInstanceExecParams and set per_node_scan_ranges for /// fragment_params and its input fragments via a depth-first traversal. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index fb1d0e6..4d55c85 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -110,6 +110,7 @@ DECLARE_string(authorized_proxy_user_config); DECLARE_string(authorized_proxy_user_config_delimiter); DECLARE_bool(abort_on_config_error); DECLARE_bool(disk_spill_encryption); +DECLARE_bool(use_krpc); DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are served"); DEFINE_int32(hs2_port, 21050, "port on which HiveServer2 client requests are served"); @@ -921,7 +922,7 @@ void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) { local_timestamp.UtcToLocal(); query_ctx->__set_now_string(local_timestamp.ToString()); query_ctx->__set_start_unix_millis(UnixMillis()); - query_ctx->__set_coord_address(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)); + query_ctx->__set_coord_address(ExecEnv::GetInstance()->backend_address()); // Creating a random_generator every time is not free, but // benchmarks show it to be slightly cheaper than contending for a @@ -1641,8 +1642,7 @@ void ImpalaServer::AddLocalBackendToStatestore( TBackendDescriptor local_backend_descriptor; local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator); local_backend_descriptor.__set_is_executor(FLAGS_is_executor); - local_backend_descriptor.__set_address( - MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)); + local_backend_descriptor.__set_address(exec_env_->backend_address()); IpAddr ip; const Hostname& hostname = local_backend_descriptor.address.hostname; Status status = HostnameToIpAddr(hostname, &ip); @@ -1653,6 +1653,10 @@ void ImpalaServer::AddLocalBackendToStatestore( return; } local_backend_descriptor.ip_address = ip; + if (FLAGS_use_krpc) { + TNetworkAddress krpc_address = MakeNetworkAddress(ip, exec_env_->krpc_port()); + local_backend_descriptor.__set_krpc_address(krpc_address); + } subscriber_topic_updates->emplace_back(TTopicDelta()); TTopicDelta& update = subscriber_topic_updates->back(); update.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/testutil/in-process-servers.cc ---------------------------------------------------------------------- diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc index 4fecfb3..5b2a4e5 100644 --- a/be/src/testutil/in-process-servers.cc +++ b/be/src/testutil/in-process-servers.cc @@ -35,6 +35,7 @@ DECLARE_string(ssl_server_certificate); DECLARE_string(ssl_private_key); DECLARE_int32(be_port); +DECLARE_int32(krpc_port); using namespace apache::thrift; using namespace impala; @@ -49,6 +50,10 @@ InProcessImpalaServer* InProcessImpalaServer::StartWithEphemeralPorts( // backend interface. FLAGS_be_port = backend_port; + int krpc_port = FindUnusedEphemeralPort(&used_ports); + if (krpc_port == -1) continue; + FLAGS_krpc_port = krpc_port; + int subscriber_port = FindUnusedEphemeralPort(&used_ports); if (subscriber_port == -1) continue; @@ -62,7 +67,7 @@ InProcessImpalaServer* InProcessImpalaServer::StartWithEphemeralPorts( if (hs2_port == -1) continue; InProcessImpalaServer* impala = - new InProcessImpalaServer("localhost", backend_port, subscriber_port, + new InProcessImpalaServer("localhost", backend_port, krpc_port, subscriber_port, webserver_port, statestore_host, statestore_port); // Start the daemon and check if it works, if not delete the current server object and // pick a new set of ports @@ -79,14 +84,14 @@ InProcessImpalaServer* InProcessImpalaServer::StartWithEphemeralPorts( } InProcessImpalaServer::InProcessImpalaServer(const string& hostname, int backend_port, - int subscriber_port, int webserver_port, const string& statestore_host, + int krpc_port, int subscriber_port, int webserver_port, const string& statestore_host, int statestore_port) : hostname_(hostname), backend_port_(backend_port), beeswax_port_(0), hs2_port_(0), impala_server_(NULL), - exec_env_(new ExecEnv(hostname, backend_port, subscriber_port, webserver_port, - statestore_host, statestore_port)) { + exec_env_(new ExecEnv(hostname, backend_port, krpc_port, subscriber_port, + webserver_port, statestore_host, statestore_port)) { } Status InProcessImpalaServer::SetCatalogInitialized() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/testutil/in-process-servers.h ---------------------------------------------------------------------- diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h index 03b02f3..d22c441 100644 --- a/be/src/testutil/in-process-servers.h +++ b/be/src/testutil/in-process-servers.h @@ -42,7 +42,7 @@ class InProcessImpalaServer { public: /// Initialises the server, but does not start any network-attached /// services or run any threads. - InProcessImpalaServer(const std::string& hostname, int backend_port, + InProcessImpalaServer(const std::string& hostname, int backend_port, int krpc_port, int subscriber_port, int webserver_port, const std::string& statestore_host, int statestore_port); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/util/network-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc index 6228a0b..1a9ce53 100644 --- a/be/src/util/network-util.cc +++ b/be/src/util/network-util.cc @@ -28,6 +28,7 @@ #include <vector> #include <boost/algorithm/string.hpp> +#include "kudu/util/net/sockaddr.h" #include "util/debug-util.h" #include "util/error-util.h" #include <util/string-parser.h> @@ -113,6 +114,11 @@ Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip){ return Status::OK(); } +bool IsResolvedAddress(const TNetworkAddress& addr) { + kudu::Sockaddr sock; + return sock.ParseString(addr.hostname, addr.port).ok(); +} + bool FindFirstNonLocalhost(const vector<string>& addresses, string* addr) { for (const string& candidate: addresses) { if (candidate != LOCALHOST) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/util/network-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h index 1783589..e964f5c 100644 --- a/be/src/util/network-util.h +++ b/be/src/util/network-util.h @@ -28,6 +28,9 @@ typedef std::string Hostname; /// Type to store IPv4 addresses. typedef std::string IpAddr; +/// Returns true if 'addr' is a fully resolved IP address, rather than a fqdn + port. +bool IsResolvedAddress(const TNetworkAddress& addr); + /// Looks up all IP addresses associated with a given hostname and returns one of them via /// 'address'. If the IP addresses of a host don't change, then subsequent calls will /// always return the same address. Returns an error status if any system call failed, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/bin/start-impala-cluster.py ---------------------------------------------------------------------- diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py index c253ca8..af960d9 100755 --- a/bin/start-impala-cluster.py +++ b/bin/start-impala-cluster.py @@ -94,7 +94,7 @@ CATALOGD_PATH = os.path.join(IMPALA_HOME, MINI_IMPALA_CLUSTER_PATH = IMPALAD_PATH + " -in-process" IMPALA_SHELL = os.path.join(IMPALA_HOME, 'bin/impala-shell.sh') -IMPALAD_PORTS = ("-beeswax_port=%d -hs2_port=%d -be_port=%d " +IMPALAD_PORTS = ("-beeswax_port=%d -hs2_port=%d -be_port=%d -krpc_port=%d " "-state_store_subscriber_port=%d -webserver_port=%d") JVM_ARGS = "-jvm_debug_port=%s -jvm_args=%s" BE_LOGGING_ARGS = "-log_filename=%s -log_dir=%s -v=%s -logbufsecs=5 -max_log_files=%s" @@ -187,10 +187,12 @@ def build_impalad_port_args(instance_num): BASE_BEESWAX_PORT = 21000 BASE_HS2_PORT = 21050 BASE_BE_PORT = 22000 + BASE_KRPC_PORT = 29000 BASE_STATE_STORE_SUBSCRIBER_PORT = 23000 BASE_WEBSERVER_PORT = 25000 return IMPALAD_PORTS % (BASE_BEESWAX_PORT + instance_num, BASE_HS2_PORT + instance_num, BASE_BE_PORT + instance_num, + BASE_KRPC_PORT + instance_num, BASE_STATE_STORE_SUBSCRIBER_PORT + instance_num, BASE_WEBSERVER_PORT + instance_num) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 54fcdaf..39df289 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -350,7 +350,7 @@ struct TQueryCtx { // Process ID of the impalad to which the user is connected. 5: required i32 pid - // Initiating coordinator. + // The initiating coordinator's address of its thrift based ImpalaInternalService. // TODO: determine whether we can get this somehow via the Thrift rpc mechanism. 6: optional Types.TNetworkAddress coord_address @@ -404,8 +404,11 @@ struct TPlanFragmentDestination { // the globally unique fragment instance id 1: required Types.TUniqueId fragment_instance_id - // ... which is being executed on this server + // IP address + port of the thrift based ImpalaInteralService on the destination 2: required Types.TNetworkAddress server + + // IP address + port of the KRPC based ImpalaInternalService on the destination + 3: optional Types.TNetworkAddress krpc_server } // Context to collect information, which is shared among all instances of that plan http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/common/thrift/StatestoreService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift index 90400b2..60a0d0d 100644 --- a/common/thrift/StatestoreService.thrift +++ b/common/thrift/StatestoreService.thrift @@ -50,7 +50,7 @@ struct TPoolStats { // all other Impalads in the cluster. Impalads can act as coordinators, executors or // both. struct TBackendDescriptor { - // Network address of the Impala service on this backend + // Network address of the thrift based ImpalaInternalService on this backend 1: required Types.TNetworkAddress address; // IP address corresponding to address.hostname. Explicitly including this saves the @@ -68,6 +68,9 @@ struct TBackendDescriptor { // True if the debug webserver is secured (for correctly generating links) 6: optional bool secure_webserver; + + // IP address + port of KRPC based ImpalaInternalService on this backend + 7: optional Types.TNetworkAddress krpc_address; } // Description of a single entry in a topic
