IMPALA-4856, IMPALA-4872: Include KRPC services in plan fragment's destinations

This change allows Impala to publish the IP address and port
information of KRPC services if it's enabled via the flag
use_krpc. The information is included in a new field in the
backend descriptor published as statestore updates. Scheduler
will also include this information in the destinations of plan
fragments. Also updated the mini-cluster startup script to assign
KRPC ports to Impalad instances.

This patch also takes into account of a problem found in
IMPALA-5795. In particular, the backend descriptor of the
coordinator may not be found in the backend map in the
scheduler if coordinator is not an executor (i.e. dedicated
coordinator). The fix to also check against the local backend
descriptor.

This patch is partially based on an abandoned patch by Henry Robinson.

Testing done: ran core tests with a patch which ignores the use_krpc
flag to exercise the code in scheduler.

Change-Id: I8707bfb5028bbe81d2a042fcf3e6e19f4b719a72
Reviewed-on: http://gerrit.cloudera.org:8080/7760
Reviewed-by: Michael Ho <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7b6ad283
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7b6ad283
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7b6ad283

Branch: refs/heads/master
Commit: 7b6ad283f46d31aa953d4d44e142b0bc6b0afbb4
Parents: ec2ba02
Author: Michael Ho <[email protected]>
Authored: Fri Aug 18 20:29:37 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Aug 31 02:09:54 2017 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc              |  5 +-
 be/src/runtime/exec-env.cc                 | 22 +++++----
 be/src/runtime/exec-env.h                  | 10 +++-
 be/src/scheduling/backend-config.cc        | 16 ++++++-
 be/src/scheduling/backend-config.h         | 11 +++--
 be/src/scheduling/scheduler-test-util.cc   |  7 ++-
 be/src/scheduling/scheduler.cc             | 63 +++++++++++++++++--------
 be/src/scheduling/scheduler.h              | 34 ++++++++-----
 be/src/service/impala-server.cc            | 10 ++--
 be/src/testutil/in-process-servers.cc      | 13 +++--
 be/src/testutil/in-process-servers.h       |  2 +-
 be/src/util/network-util.cc                |  6 +++
 be/src/util/network-util.h                 |  3 ++
 bin/start-impala-cluster.py                |  4 +-
 common/thrift/ImpalaInternalService.thrift |  7 ++-
 common/thrift/StatestoreService.thrift     |  5 +-
 16 files changed, 158 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 9404148..ccffcb3 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -35,7 +35,10 @@ DEFINE_string(hostname, "", "Hostname to use for this 
daemon, also used as part
               "the Kerberos principal, if enabled. If not set, the system 
default will be"
               " used");
 
-DEFINE_int32(be_port, 22000, "port on which ImpalaInternalService is 
exported");
+DEFINE_int32(be_port, 22000,
+    "port on which thrift based ImpalaInternalService is exported");
+DEFINE_int32_hidden(krpc_port, 29000,
+    "port on which KRPC based ImpalaInternalService is exported");
 
 // Kerberos is enabled if and only if principal is set.
 DEFINE_string(principal, "", "Kerberos principal. If set, both client and 
backend network"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 02d07e8..0d7518a 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -76,7 +76,7 @@ DEFINE_int32(state_store_subscriber_port, 23000,
 DEFINE_int32(num_hdfs_worker_threads, 16,
     "(Advanced) The number of threads in the global HDFS operation pool");
 DEFINE_bool(disable_admission_control, false, "Disables admission control.");
-DEFINE_bool_hidden(use_krpc, false, "Used to indicate whether to use Kudu RPC 
for the "
+DEFINE_bool_hidden(use_krpc, false, "Used to indicate whether to use KRPC for 
the "
     "DataStream subsystem, or the Thrift RPC layer instead. Defaults to false. 
"
     "KRPC not yet supported");
 
@@ -84,6 +84,7 @@ DECLARE_int32(state_store_port);
 DECLARE_int32(num_threads_per_core);
 DECLARE_int32(num_cores);
 DECLARE_int32(be_port);
+DECLARE_int32(krpc_port);
 DECLARE_string(mem_limit);
 DECLARE_string(buffer_pool_limit);
 DECLARE_string(buffer_pool_clean_pages_limit);
@@ -136,11 +137,13 @@ struct ExecEnv::KuduClientPtr {
 ExecEnv* ExecEnv::exec_env_ = nullptr;
 
 ExecEnv::ExecEnv()
-  : ExecEnv(FLAGS_hostname, FLAGS_be_port, FLAGS_state_store_subscriber_port,
-        FLAGS_webserver_port, FLAGS_state_store_host, FLAGS_state_store_port) 
{}
+  : ExecEnv(FLAGS_hostname, FLAGS_be_port, FLAGS_krpc_port,
+        FLAGS_state_store_subscriber_port, FLAGS_webserver_port,
+        FLAGS_state_store_host, FLAGS_state_store_port) {}
 
-ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
-    int webserver_port, const string& statestore_host, int statestore_port)
+ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port,
+    int subscriber_port, int webserver_port, const string& statestore_host,
+    int statestore_port)
   : obj_pool_(new ObjectPool),
     metrics_(new MetricGroup("impala-metrics")),
     impalad_client_cache_(
@@ -165,7 +168,8 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, 
int subscriber_port,
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 
10000)),
     query_exec_mgr_(new QueryExecMgr()),
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
-    backend_address_(MakeNetworkAddress(hostname, backend_port)) {
+    backend_address_(MakeNetworkAddress(hostname, backend_port)),
+    krpc_port_(krpc_port) {
 
   if (FLAGS_use_krpc) {
     stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
@@ -185,7 +189,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, 
int subscriber_port,
 
   if (FLAGS_is_coordinator) {
     scheduler_.reset(new Scheduler(statestore_subscriber_.get(),
-        statestore_subscriber_->id(), backend_address_, metrics_.get(), 
webserver_.get(),
+        statestore_subscriber_->id(), metrics_.get(), webserver_.get(),
         request_pool_service_.get()));
   }
 
@@ -332,7 +336,9 @@ Status ExecEnv::StartServices() {
     LOG(INFO) << "Not starting webserver";
   }
 
-  if (scheduler_ != nullptr) RETURN_IF_ERROR(scheduler_->Init());
+  if (scheduler_ != nullptr) {
+    RETURN_IF_ERROR(scheduler_->Init(backend_address_, krpc_port_));
+  }
   if (admission_controller_ != nullptr) 
RETURN_IF_ERROR(admission_controller_->Init());
 
   // Get the fs.defaultFS value set in core-site.xml and assign it to

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 9e925a1..25d1055 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -69,8 +69,9 @@ class ExecEnv {
  public:
   ExecEnv();
 
-  ExecEnv(const std::string& hostname, int backend_port, int subscriber_port,
-      int webserver_port, const std::string& statestore_host, int 
statestore_port);
+  ExecEnv(const std::string& hostname, int backend_port, int krpc_port,
+      int subscriber_port, int webserver_port, const std::string& 
statestore_host,
+      int statestore_port);
 
   /// Returns the first created exec env instance. In a normal impalad, this is
   /// the only instance. In test setups with multiple ExecEnv's per process,
@@ -126,6 +127,8 @@ class ExecEnv {
 
   const TNetworkAddress& backend_address() const { return backend_address_; }
 
+  int krpc_port() const { return krpc_port_; }
+
   /// Initializes the exec env for running FE tests.
   Status InitForFeTests() WARN_UNUSED_RESULT;
 
@@ -187,6 +190,9 @@ class ExecEnv {
   /// Address of the Impala backend server instance
   TNetworkAddress backend_address_;
 
+  /// Port number on which all KRPC-based services are exported.
+  int krpc_port_;
+
   /// fs.defaultFs value set in core-site.xml
   std::string default_fs_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/scheduling/backend-config.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/backend-config.cc 
b/be/src/scheduling/backend-config.cc
index e5c6824..715ec34 100644
--- a/be/src/scheduling/backend-config.cc
+++ b/be/src/scheduling/backend-config.cc
@@ -75,15 +75,27 @@ void BackendConfig::RemoveBackend(const TBackendDescriptor& 
be_desc) {
 bool BackendConfig::LookUpBackendIp(const Hostname& hostname, IpAddr* ip) 
const {
   // Check if hostname is already a valid IP address.
   if (backend_map_.find(hostname) != backend_map_.end()) {
-    if (ip) *ip = hostname;
+    if (ip != nullptr) *ip = hostname;
     return true;
   }
   auto it = backend_ip_map_.find(hostname);
   if (it != backend_ip_map_.end()) {
-    if (ip) *ip = it->second;
+    if (ip != nullptr) *ip = it->second;
     return true;
   }
   return false;
 }
 
+const TBackendDescriptor* BackendConfig::LookUpBackendDesc(
+    const TNetworkAddress& host) const {
+  IpAddr ip;
+  if (LIKELY(LookUpBackendIp(host.hostname, &ip))) {
+    const BackendConfig::BackendList& be_list = GetBackendListForHost(ip);
+    for (const TBackendDescriptor& desc : be_list) {
+      if (desc.address == host) return &desc;
+    }
+  }
+  return nullptr;
+}
+
 }  // end ns impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/scheduling/backend-config.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/backend-config.h 
b/be/src/scheduling/backend-config.h
index 25f8292..c7cbbcd 100644
--- a/be/src/scheduling/backend-config.h
+++ b/be/src/scheduling/backend-config.h
@@ -52,11 +52,16 @@ class BackendConfig {
   /// Look up the IP address of 'hostname' in the internal backend maps and 
return
   /// whether the lookup was successful. If 'hostname' itself is a valid IP 
address and
   /// is contained in backend_map_, then it is copied to 'ip' and true is 
returned. 'ip'
-  /// can be NULL if the caller only wants to check whether the lookup 
succeeds. Use this
-  /// method to resolve datanode hostnames to IP addresses during scheduling, 
to prevent
-  /// blocking on the OS.
+  /// can be nullptr if the caller only wants to check whether the lookup 
succeeds. Use
+  /// this method to resolve datanode hostnames to IP addresses during 
scheduling, to
+  /// prevent blocking on the OS.
   bool LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const;
 
+  /// Look up the backend descriptor for the backend with hostname 'host'.
+  /// Returns nullptr if it's not found. The returned descriptor should not
+  /// be retained beyond the lifetime of this BackendConfig.
+  const TBackendDescriptor* LookUpBackendDesc(const TNetworkAddress& host) 
const;
+
   int NumBackends() const { return backend_map_.size(); }
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc 
b/be/src/scheduling/scheduler-test-util.cc
index b9b2c6e..841bc24 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -20,11 +20,14 @@
 #include <boost/unordered_set.hpp>
 
 #include "common/names.h"
+#include "runtime/exec-env.h"
 #include "scheduling/scheduler.h"
 
 using namespace impala;
 using namespace impala::test;
 
+DECLARE_int32(krpc_port);
+
 /// Sample 'n' elements without replacement from the set [0..N-1].
 /// This is an implementation of "Algorithm R" by J. Vitter.
 void SampleN(int n, int N, vector<int>* out) {
@@ -507,9 +510,9 @@ void SchedulerWrapper::InitializeScheduler() {
   scheduler_backend_address.hostname = scheduler_host.ip;
   scheduler_backend_address.port = scheduler_host.be_port;
 
-  scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id, 
scheduler_backend_address,
+  scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id,
       &metrics_, nullptr, nullptr));
-  const Status status = scheduler_->Init();
+  const Status status = scheduler_->Init(scheduler_backend_address, 
FLAGS_krpc_port);
   DCHECK(status.ok()) << "Scheduler init failed in test";
   // Initialize the scheduler backend maps.
   SendFullMembershipMap();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 1c39230..f6e323f 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -43,8 +43,7 @@ using boost::algorithm::join;
 using namespace apache::thrift;
 using namespace strings;
 
-DECLARE_int32(be_port);
-DECLARE_string(hostname);
+DECLARE_bool(use_krpc);
 
 namespace impala {
 
@@ -56,28 +55,25 @@ static const string 
NUM_BACKENDS_KEY("simple-scheduler.num-backends");
 const string Scheduler::IMPALA_MEMBERSHIP_TOPIC("impala-membership");
 
 Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& 
backend_id,
-    const TNetworkAddress& backend_address, MetricGroup* metrics, Webserver* 
webserver,
-    RequestPoolService* request_pool_service)
+    MetricGroup* metrics, Webserver* webserver, RequestPoolService* 
request_pool_service)
   : executors_config_(std::make_shared<const BackendConfig>()),
     metrics_(metrics->GetOrCreateChildGroup("scheduler")),
     webserver_(webserver),
     statestore_subscriber_(subscriber),
     local_backend_id_(backend_id),
     thrift_serializer_(false),
-    total_assignments_(nullptr),
-    total_local_assignments_(nullptr),
-    initialized_(nullptr),
     request_pool_service_(request_pool_service) {
-  local_backend_descriptor_.address = backend_address;
 }
 
-Status Scheduler::Init() {
+Status Scheduler::Init(const TNetworkAddress& backend_address, int krpc_port) {
   LOG(INFO) << "Starting scheduler";
 
-  // Figure out what our IP address is, so that each subscriber
-  // doesn't have to resolve it on every heartbeat.
+  // Figure out what our IP address is, so that each subscriber doesn't have 
to resolve
+  // it on every heartbeat. KRPC also assumes that the address is resolved 
already.
+  // May as well do it up front to avoid frequent DNS requests.
+  local_backend_descriptor_.address = backend_address;
   IpAddr ip;
-  const Hostname& hostname = local_backend_descriptor_.address.hostname;
+  const Hostname& hostname = backend_address.hostname;
   Status status = HostnameToIpAddr(hostname, &ip);
   if (!status.ok()) {
     VLOG(1) << status.GetDetail();
@@ -88,6 +84,12 @@ Status Scheduler::Init() {
   local_backend_descriptor_.ip_address = ip;
   LOG(INFO) << "Scheduler using " << ip << " as IP address";
 
+  if (FLAGS_use_krpc) {
+    // KRPC expects address to have been resolved already.
+    TNetworkAddress krpc_svc_addr = MakeNetworkAddress(ip, krpc_port);
+    local_backend_descriptor_.__set_krpc_address(krpc_svc_addr);
+  }
+
   coord_only_backend_config_.AddBackend(local_backend_descriptor_);
 
   if (statestore_subscriber_ != nullptr) {
@@ -222,10 +224,22 @@ void Scheduler::SetExecutorsConfig(const 
ExecutorsConfigPtr& executors_config) {
   executors_config_ = executors_config;
 }
 
-Status Scheduler::ComputeScanRangeAssignment(QuerySchedule* schedule) {
+const TBackendDescriptor& Scheduler::LookUpBackendDesc(
+    const BackendConfig& executor_config, const TNetworkAddress& host) {
+  const TBackendDescriptor* desc = executor_config.LookUpBackendDesc(host);
+  if (desc == nullptr) {
+    // Local host may not be in executor_config if it's a dedicated 
coordinator.
+    DCHECK_EQ(host, local_backend_descriptor_.address);
+    DCHECK(!local_backend_descriptor_.is_executor);
+    desc = &local_backend_descriptor_;
+  }
+  return *desc;
+}
+
+Status Scheduler::ComputeScanRangeAssignment(
+    const BackendConfig& executor_config, QuerySchedule* schedule) {
   RuntimeProfile::Counter* total_assignment_timer =
       ADD_TIMER(schedule->summary_profile(), 
"ComputeScanRangeAssignmentTimer");
-  ExecutorsConfigPtr executor_config = GetExecutorsConfig();
   const TQueryExecRequest& exec_request = schedule->request();
   for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
     for (const auto& entry : plan_exec_info.per_node_scan_ranges) {
@@ -248,7 +262,7 @@ Status Scheduler::ComputeScanRangeAssignment(QuerySchedule* 
schedule) {
       FragmentScanRangeAssignment* assignment =
           
&schedule->GetFragmentExecParams(fragment.idx)->scan_range_assignment;
       RETURN_IF_ERROR(
-          ComputeScanRangeAssignment(*executor_config, node_id, 
node_replica_preference,
+          ComputeScanRangeAssignment(executor_config, node_id, 
node_replica_preference,
               node_random_replica, entry.second, exec_request.host_list, 
exec_at_coord,
               schedule->query_options(), total_assignment_timer, assignment));
       schedule->IncNumScanRanges(entry.second.size());
@@ -257,7 +271,8 @@ Status Scheduler::ComputeScanRangeAssignment(QuerySchedule* 
schedule) {
   return Status::OK();
 }
 
-void Scheduler::ComputeFragmentExecParams(QuerySchedule* schedule) {
+void Scheduler::ComputeFragmentExecParams(
+    const BackendConfig& executor_config, QuerySchedule* schedule) {
   const TQueryExecRequest& exec_request = schedule->request();
 
   // for each plan, compute the FInstanceExecParams for the tree of fragments
@@ -282,7 +297,14 @@ void Scheduler::ComputeFragmentExecParams(QuerySchedule* 
schedule) {
       for (int i = 0; i < dest_params->instance_exec_params.size(); ++i) {
         TPlanFragmentDestination& dest = src_params->destinations[i];
         
dest.__set_fragment_instance_id(dest_params->instance_exec_params[i].instance_id);
-        dest.__set_server(dest_params->instance_exec_params[i].host);
+        const TNetworkAddress& host = 
dest_params->instance_exec_params[i].host;
+        dest.__set_server(host);
+        if (FLAGS_use_krpc) {
+          const TBackendDescriptor& desc = LookUpBackendDesc(executor_config, 
host);
+          DCHECK(desc.__isset.krpc_address);
+          DCHECK(IsResolvedAddress(desc.krpc_address));
+          dest.__set_krpc_server(desc.krpc_address);
+        }
       }
 
       // enumerate senders consecutively;
@@ -679,8 +701,11 @@ void Scheduler::GetScanHosts(TPlanNodeId scan_id, const 
FragmentExecParams& para
 }
 
 Status Scheduler::Schedule(QuerySchedule* schedule) {
-  RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule));
-  ComputeFragmentExecParams(schedule);
+  // Make a copy of the executor_config upfront to avoid using inconsistent 
views
+  // between ComputeScanRangeAssignment() and ComputeFragmentExecParams().
+  ExecutorsConfigPtr config_ptr = GetExecutorsConfig();
+  RETURN_IF_ERROR(ComputeScanRangeAssignment(*config_ptr, schedule));
+  ComputeFragmentExecParams(*config_ptr, schedule);
   ComputeBackendExecParams(schedule);
 #ifndef NDEBUG
   schedule->Validate();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 07e0cfc..d1a22f8 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -78,15 +78,17 @@ class Scheduler {
   /// Initialize with a subscription manager that we can register with for 
updates to the
   /// set of available backends.
   ///  - backend_id - unique identifier for this Impala backend (usually a 
host:port)
-  ///  - backend_address - the address that this backend listens on
   Scheduler(StatestoreSubscriber* subscriber, const std::string& backend_id,
-      const TNetworkAddress& backend_address, MetricGroup* metrics, Webserver* 
webserver,
+      MetricGroup* metrics, Webserver* webserver,
       RequestPoolService* request_pool_service);
 
-  /// Initialises the scheduler, acquiring all resources needed to make 
scheduling
+  /// Initializes the scheduler, acquiring all resources needed to make 
scheduling
   /// decisions once this method returns. Register with the subscription 
manager if
-  /// required.
-  impala::Status Init();
+  /// required. Also initializes the local backend descriptor. Returns error 
status
+  /// on failure. 'backend_address' is the address of thrift based 
ImpalaInternalService
+  /// of this backend. 'krpc_port' is the port on which KRPC based 
ImpalaInternalService
+  /// is exported.
+  Status Init(const TNetworkAddress& backend_address, int krpc_port);
 
   /// Populates given query schedule and assigns fragments to hosts based on 
scan
   /// ranges in the query exec request.
@@ -300,14 +302,14 @@ class Scheduler {
   ThriftSerializer thrift_serializer_;
 
   /// Locality metrics
-  IntCounter* total_assignments_;
-  IntCounter* total_local_assignments_;
+  IntCounter* total_assignments_ = nullptr;
+  IntCounter* total_local_assignments_ = nullptr;
 
   /// Initialization metric
-  BooleanProperty* initialized_;
+  BooleanProperty* initialized_ = nullptr;
 
   /// Current number of executors
-  IntGauge* num_fragment_instances_metric_;
+  IntGauge* num_fragment_instances_metric_ = nullptr;
 
   /// Used for user-to-pool resolution and looking up pool configurations. Not 
owned by
   /// us.
@@ -318,6 +320,12 @@ class Scheduler {
   ExecutorsConfigPtr GetExecutorsConfig() const;
   void SetExecutorsConfig(const ExecutorsConfigPtr& executors_config);
 
+  /// Returns the backend descriptor corresponding to 'host' which could be a 
remote
+  /// backend or the local host itself. The returned descriptor should not be 
retained
+  /// beyond the lifetime of 'executor_config'.
+  const TBackendDescriptor& LookUpBackendDesc(
+      const BackendConfig& executor_config, const TNetworkAddress& host);
+
   /// Called asynchronously when an update is received from the subscription 
manager
   void UpdateMembership(const StatestoreSubscriber::TopicDeltaMap& 
incoming_topic_deltas,
       std::vector<TTopicDelta>* subscriber_topic_updates);
@@ -331,7 +339,9 @@ class Scheduler {
   /// Unpartitioned fragments are assigned to the coordinator. Populate the 
schedule's
   /// fragment_exec_params_ with the resulting scan range assignment.
   /// We have a benchmark for this method in 
be/src/benchmarks/scheduler-benchmark.cc.
-  Status ComputeScanRangeAssignment(QuerySchedule* schedule);
+  /// 'executor_config' is the executor configuration to use for scheduling.
+  Status ComputeScanRangeAssignment(const BackendConfig& executor_config,
+      QuerySchedule* schedule);
 
   /// Process the list of scan ranges of a single plan node and compute scan 
range
   /// assignments (returned in 'assignment'). The result is a mapping from 
hosts to their
@@ -407,7 +417,9 @@ class Scheduler {
   /// TQueryExecRequest.plan_exec_info.
   /// This includes the routing information (destinations, 
per_exch_num_senders,
   /// sender_id)
-  void ComputeFragmentExecParams(QuerySchedule* schedule);
+  /// 'executor_config' is the executor configuration to use for scheduling.
+  void ComputeFragmentExecParams(const BackendConfig& executor_config,
+      QuerySchedule* schedule);
 
   /// Recursively create FInstanceExecParams and set per_node_scan_ranges for
   /// fragment_params and its input fragments via a depth-first traversal.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index fb1d0e6..4d55c85 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -110,6 +110,7 @@ DECLARE_string(authorized_proxy_user_config);
 DECLARE_string(authorized_proxy_user_config_delimiter);
 DECLARE_bool(abort_on_config_error);
 DECLARE_bool(disk_spill_encryption);
+DECLARE_bool(use_krpc);
 
 DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are 
served");
 DEFINE_int32(hs2_port, 21050, "port on which HiveServer2 client requests are 
served");
@@ -921,7 +922,7 @@ void ImpalaServer::PrepareQueryContext(TQueryCtx* 
query_ctx) {
   local_timestamp.UtcToLocal();
   query_ctx->__set_now_string(local_timestamp.ToString());
   query_ctx->__set_start_unix_millis(UnixMillis());
-  query_ctx->__set_coord_address(MakeNetworkAddress(FLAGS_hostname, 
FLAGS_be_port));
+  query_ctx->__set_coord_address(ExecEnv::GetInstance()->backend_address());
 
   // Creating a random_generator every time is not free, but
   // benchmarks show it to be slightly cheaper than contending for a
@@ -1641,8 +1642,7 @@ void ImpalaServer::AddLocalBackendToStatestore(
   TBackendDescriptor local_backend_descriptor;
   local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator);
   local_backend_descriptor.__set_is_executor(FLAGS_is_executor);
-  local_backend_descriptor.__set_address(
-      MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port));
+  local_backend_descriptor.__set_address(exec_env_->backend_address());
   IpAddr ip;
   const Hostname& hostname = local_backend_descriptor.address.hostname;
   Status status = HostnameToIpAddr(hostname, &ip);
@@ -1653,6 +1653,10 @@ void ImpalaServer::AddLocalBackendToStatestore(
     return;
   }
   local_backend_descriptor.ip_address = ip;
+  if (FLAGS_use_krpc) {
+    TNetworkAddress krpc_address = MakeNetworkAddress(ip, 
exec_env_->krpc_port());
+    local_backend_descriptor.__set_krpc_address(krpc_address);
+  }
   subscriber_topic_updates->emplace_back(TTopicDelta());
   TTopicDelta& update = subscriber_topic_updates->back();
   update.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc 
b/be/src/testutil/in-process-servers.cc
index 4fecfb3..5b2a4e5 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -35,6 +35,7 @@
 DECLARE_string(ssl_server_certificate);
 DECLARE_string(ssl_private_key);
 DECLARE_int32(be_port);
+DECLARE_int32(krpc_port);
 
 using namespace apache::thrift;
 using namespace impala;
@@ -49,6 +50,10 @@ InProcessImpalaServer* 
InProcessImpalaServer::StartWithEphemeralPorts(
     // backend interface.
     FLAGS_be_port = backend_port;
 
+    int krpc_port = FindUnusedEphemeralPort(&used_ports);
+    if (krpc_port == -1) continue;
+    FLAGS_krpc_port = krpc_port;
+
     int subscriber_port = FindUnusedEphemeralPort(&used_ports);
     if (subscriber_port == -1) continue;
 
@@ -62,7 +67,7 @@ InProcessImpalaServer* 
InProcessImpalaServer::StartWithEphemeralPorts(
     if (hs2_port == -1) continue;
 
     InProcessImpalaServer* impala =
-        new InProcessImpalaServer("localhost", backend_port, subscriber_port,
+        new InProcessImpalaServer("localhost", backend_port, krpc_port, 
subscriber_port,
             webserver_port, statestore_host, statestore_port);
     // Start the daemon and check if it works, if not delete the current 
server object and
     // pick a new set of ports
@@ -79,14 +84,14 @@ InProcessImpalaServer* 
InProcessImpalaServer::StartWithEphemeralPorts(
 }
 
 InProcessImpalaServer::InProcessImpalaServer(const string& hostname, int 
backend_port,
-    int subscriber_port, int webserver_port, const string& statestore_host,
+    int krpc_port, int subscriber_port, int webserver_port, const string& 
statestore_host,
     int statestore_port)
     : hostname_(hostname), backend_port_(backend_port),
       beeswax_port_(0),
       hs2_port_(0),
       impala_server_(NULL),
-      exec_env_(new ExecEnv(hostname, backend_port, subscriber_port, 
webserver_port,
-          statestore_host, statestore_port)) {
+      exec_env_(new ExecEnv(hostname, backend_port, krpc_port, subscriber_port,
+          webserver_port, statestore_host, statestore_port)) {
 }
 
 Status InProcessImpalaServer::SetCatalogInitialized() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/testutil/in-process-servers.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.h 
b/be/src/testutil/in-process-servers.h
index 03b02f3..d22c441 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -42,7 +42,7 @@ class InProcessImpalaServer {
  public:
   /// Initialises the server, but does not start any network-attached
   /// services or run any threads.
-  InProcessImpalaServer(const std::string& hostname, int backend_port,
+  InProcessImpalaServer(const std::string& hostname, int backend_port, int 
krpc_port,
                         int subscriber_port, int webserver_port,
                         const std::string& statestore_host, int 
statestore_port);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/util/network-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index 6228a0b..1a9ce53 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -28,6 +28,7 @@
 #include <vector>
 #include <boost/algorithm/string.hpp>
 
+#include "kudu/util/net/sockaddr.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
 #include <util/string-parser.h>
@@ -113,6 +114,11 @@ Status HostnameToIpAddr(const Hostname& hostname, IpAddr* 
ip){
   return Status::OK();
 }
 
+bool IsResolvedAddress(const TNetworkAddress& addr) {
+  kudu::Sockaddr sock;
+  return sock.ParseString(addr.hostname, addr.port).ok();
+}
+
 bool FindFirstNonLocalhost(const vector<string>& addresses, string* addr) {
   for (const string& candidate: addresses) {
     if (candidate != LOCALHOST) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/be/src/util/network-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index 1783589..e964f5c 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -28,6 +28,9 @@ typedef std::string Hostname;
 /// Type to store IPv4 addresses.
 typedef std::string IpAddr;
 
+/// Returns true if 'addr' is a fully resolved IP address, rather than a fqdn 
+ port.
+bool IsResolvedAddress(const TNetworkAddress& addr);
+
 /// Looks up all IP addresses associated with a given hostname and returns one 
of them via
 /// 'address'. If the IP addresses of a host don't change, then subsequent 
calls will
 /// always return the same address. Returns an error status if any system call 
failed,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index c253ca8..af960d9 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -94,7 +94,7 @@ CATALOGD_PATH = os.path.join(IMPALA_HOME,
 MINI_IMPALA_CLUSTER_PATH = IMPALAD_PATH + " -in-process"
 
 IMPALA_SHELL = os.path.join(IMPALA_HOME, 'bin/impala-shell.sh')
-IMPALAD_PORTS = ("-beeswax_port=%d -hs2_port=%d  -be_port=%d "
+IMPALAD_PORTS = ("-beeswax_port=%d -hs2_port=%d  -be_port=%d -krpc_port=%d "
                  "-state_store_subscriber_port=%d -webserver_port=%d")
 JVM_ARGS = "-jvm_debug_port=%s -jvm_args=%s"
 BE_LOGGING_ARGS = "-log_filename=%s -log_dir=%s -v=%s -logbufsecs=5 
-max_log_files=%s"
@@ -187,10 +187,12 @@ def build_impalad_port_args(instance_num):
   BASE_BEESWAX_PORT = 21000
   BASE_HS2_PORT = 21050
   BASE_BE_PORT = 22000
+  BASE_KRPC_PORT = 29000
   BASE_STATE_STORE_SUBSCRIBER_PORT = 23000
   BASE_WEBSERVER_PORT = 25000
   return IMPALAD_PORTS % (BASE_BEESWAX_PORT + instance_num, BASE_HS2_PORT + 
instance_num,
                           BASE_BE_PORT + instance_num,
+                          BASE_KRPC_PORT + instance_num,
                           BASE_STATE_STORE_SUBSCRIBER_PORT + instance_num,
                           BASE_WEBSERVER_PORT + instance_num)
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index 54fcdaf..39df289 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -350,7 +350,7 @@ struct TQueryCtx {
   // Process ID of the impalad to which the user is connected.
   5: required i32 pid
 
-  // Initiating coordinator.
+  // The initiating coordinator's address of its thrift based 
ImpalaInternalService.
   // TODO: determine whether we can get this somehow via the Thrift rpc 
mechanism.
   6: optional Types.TNetworkAddress coord_address
 
@@ -404,8 +404,11 @@ struct TPlanFragmentDestination {
   // the globally unique fragment instance id
   1: required Types.TUniqueId fragment_instance_id
 
-  // ... which is being executed on this server
+  // IP address + port of the thrift based ImpalaInteralService on the 
destination
   2: required Types.TNetworkAddress server
+
+  // IP address + port of the KRPC based ImpalaInternalService on the 
destination
+  3: optional Types.TNetworkAddress krpc_server
 }
 
 // Context to collect information, which is shared among all instances of that 
plan

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7b6ad283/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift 
b/common/thrift/StatestoreService.thrift
index 90400b2..60a0d0d 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -50,7 +50,7 @@ struct TPoolStats {
 // all other Impalads in the cluster. Impalads can act as coordinators, 
executors or
 // both.
 struct TBackendDescriptor {
-  // Network address of the Impala service on this backend
+  // Network address of the thrift based ImpalaInternalService on this backend
   1: required Types.TNetworkAddress address;
 
   // IP address corresponding to address.hostname. Explicitly including this 
saves the
@@ -68,6 +68,9 @@ struct TBackendDescriptor {
 
   // True if the debug webserver is secured (for correctly generating links)
   6: optional bool secure_webserver;
+
+  // IP address + port of KRPC based ImpalaInternalService on this backend
+  7: optional Types.TNetworkAddress krpc_address;
 }
 
 // Description of a single entry in a topic

Reply via email to