This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 1af60a1  IMPALA-9180 (part 3): Remove legacy backend port
1af60a1 is described below

commit 1af60a15605463ab4ba00d5326d130d0a3165821
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Thu Oct 1 13:47:44 2020 -0700

    IMPALA-9180 (part 3): Remove legacy backend port
    
    The legacy Thrift based Impala internal service has been removed so
    the backend port 22000 can be freed up.
    
    This patch set flag be_port as a REMOVED_FLAG and all infrastructures
    around it are cleaned up. StatestoreSubscriber::subscriber_id is set
    as hostname + krpc_port.
    
    Testing:
     - Passed the exhaustive test.
    
    Change-Id: Ic6909a8da449b4d25ee98037b3eb459af4850dc6
    Reviewed-on: http://gerrit.cloudera.org:8080/16533
    Reviewed-by: Thomas Tauber-Marshall <tmarsh...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/common/global-flags.cc                   | 3 +--
 be/src/runtime/coordinator-backend-state.h      | 4 ++--
 be/src/runtime/data-stream-test.cc              | 4 ++--
 be/src/runtime/exec-env.cc                      | 9 ++++-----
 be/src/runtime/krpc-data-stream-sender.cc       | 6 +++---
 be/src/scheduling/schedule-state.h              | 4 ++--
 be/src/scheduling/scheduler.cc                  | 2 +-
 bin/start-impala-cluster.py                     | 4 +---
 common/protobuf/admission_control_service.proto | 4 ++--
 common/protobuf/control_service.proto           | 6 +++---
 common/protobuf/statestore_service.proto        | 4 ++--
 infra/deploy/deploy.py                          | 1 -
 tests/common/impala_cluster.py                  | 6 +-----
 tests/common/impala_service.py                  | 3 +--
 tests/custom_cluster/test_query_retries.py      | 4 ++--
 tests/shell/test_shell_interactive.py           | 2 +-
 16 files changed, 28 insertions(+), 38 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 3025be9..339ff14 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -38,8 +38,6 @@ DEFINE_string(hostname, "", "Hostname to use for this daemon, 
also used as part
 DEFINE_bool(use_resolved_hostname, false, "If true, --hostname is resolved 
before use, "
               "so that the IP address will be used everywhere instead of the 
hostname.");
 
-DEFINE_int32(be_port, 22000,
-    "port on which thrift based ImpalaInternalService is exported");
 DEFINE_int32(krpc_port, 27000,
     "port on which KRPC based ImpalaInternalService is exported");
 
@@ -379,6 +377,7 @@ REMOVED_FLAG(abfs_read_chunk_size);
 REMOVED_FLAG(adls_read_chunk_size);
 REMOVED_FLAG(authorization_policy_file);
 REMOVED_FLAG(authorization_policy_provider_class);
+REMOVED_FLAG(be_port);
 REMOVED_FLAG(be_service_threads);
 REMOVED_FLAG(cgroup_hierarchy_path);
 REMOVED_FLAG(coordinator_rpc_threads);
diff --git a/be/src/runtime/coordinator-backend-state.h 
b/be/src/runtime/coordinator-backend-state.h
index e9b300e..9797b13 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -336,9 +336,9 @@ class Coordinator::BackendState {
   /// Owned by coordinator object pool provided in the c'tor, created in 
Update().
   RuntimeProfile* host_profile_ = nullptr;
 
-  /// Thrift address of execution backend.
+  /// Address of execution backend: hostname + krpc_port.
   NetworkAddressPB host_;
-  /// Krpc address of execution backend.
+  /// Krpc address of execution backend: ip_address + krpc_port.
   NetworkAddressPB krpc_host_;
 
   /// The query context of the Coordinator that owns this BackendState.
diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index 06b9dd6..d7f94a9 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -74,7 +74,7 @@ using kudu::rpc::ResultTracker;
 using kudu::rpc::RpcContext;
 using kudu::rpc::ServiceIf;
 
-DEFINE_int32(port, 20001, "port on which to run Impala Thrift based test 
backend.");
+DEFINE_int32(port, 20001, "port on which to run Impala krpc based test 
backend.");
 DECLARE_int32(datastream_sender_timeout_ms);
 DECLARE_int32(datastream_service_num_deserialization_threads);
 DECLARE_int32(datastream_service_deserialization_queue_size);
@@ -320,7 +320,7 @@ class DataStreamTest : public testing::Test {
   void GetNextInstanceId(TUniqueId* instance_id) {
     PlanFragmentDestinationPB* dest = dest_.Add();
     *dest->mutable_fragment_instance_id() = next_instance_id_;
-    *dest->mutable_thrift_backend() = MakeNetworkAddressPB("localhost", 
FLAGS_port);
+    *dest->mutable_address() = MakeNetworkAddressPB("localhost", FLAGS_port);
     *dest->mutable_krpc_backend() = FromTNetworkAddress(krpc_address_);
     UniqueIdPBToTUniqueId(next_instance_id_, instance_id);
     next_instance_id_.set_lo(next_instance_id_.lo() + 1);
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index fe2bfd8..0dd78bc 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -124,7 +124,6 @@ DEFINE_int32_hidden(local_catalog_max_fetch_retries, 40,
 DECLARE_int32(state_store_port);
 DECLARE_int32(num_threads_per_core);
 DECLARE_int32(num_cores);
-DECLARE_int32(be_port);
 DECLARE_int32(krpc_port);
 DECLARE_string(mem_limit);
 DECLARE_bool(mem_limit_includes_jvm);
@@ -274,10 +273,10 @@ ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int 
webserver_port,
   TNetworkAddress statestore_address =
       MakeNetworkAddress(statestore_host, statestore_port);
 
-  // Set StatestoreSubscriber::subscriber_id as hostname + be_port.
-  statestore_subscriber_.reset(
-      new StatestoreSubscriber(Substitute("impalad@$0:$1", FLAGS_hostname, 
FLAGS_be_port),
-          subscriber_address, statestore_address, metrics_.get()));
+  // Set StatestoreSubscriber::subscriber_id as hostname + krpc_port.
+  statestore_subscriber_.reset(new StatestoreSubscriber(
+      Substitute("impalad@$0:$1", FLAGS_hostname, FLAGS_krpc_port), 
subscriber_address,
+      statestore_address, metrics_.get()));
 
   if (FLAGS_is_coordinator) {
     hdfs_op_thread_pool_.reset(
diff --git a/be/src/runtime/krpc-data-stream-sender.cc 
b/be/src/runtime/krpc-data-stream-sender.cc
index 42bcec9..42d0435 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -732,9 +732,9 @@ KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId 
sink_id, int sender_id,
       || sink.output_partition.type == TPartitionType::KUDU);
 
   for (const auto& destination : destinations) {
-    channels_.emplace_back(new Channel(this, row_desc_,
-        destination.thrift_backend().hostname(), destination.krpc_backend(),
-        destination.fragment_instance_id(), sink.dest_node_id, 
per_channel_buffer_size));
+    channels_.emplace_back(new Channel(this, row_desc_, 
destination.address().hostname(),
+        destination.krpc_backend(), destination.fragment_instance_id(), 
sink.dest_node_id,
+        per_channel_buffer_size));
   }
 
   if (partition_type_ == TPartitionType::UNPARTITIONED
diff --git a/be/src/scheduling/schedule-state.h 
b/be/src/scheduling/schedule-state.h
index a011d94..39181cb 100644
--- a/be/src/scheduling/schedule-state.h
+++ b/be/src/scheduling/schedule-state.h
@@ -75,10 +75,10 @@ typedef std::unordered_map<NetworkAddressPB, 
BackendScheduleState>
 /// transferred to the corresponding BackendExecParamsPB in
 /// Scheduler::ComputeBackendExecParams().
 struct FInstanceScheduleState {
-  /// Thrift address of execution backend.
+  /// Address of execution backend: hostname + krpc_port.
   NetworkAddressPB host;
 
-  /// Krpc address of execution backend.
+  /// Krpc address of execution backend: ip_address + krpc_port.
   NetworkAddressPB krpc_host;
 
   /// Contains any info that needs to be sent back to the coordinator. 
Computed during
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 850c086..1034fce 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -229,7 +229,7 @@ void Scheduler::ComputeFragmentExecParams(
         *dest->mutable_fragment_instance_id() =
             dest_state->instance_states[i].exec_params.instance_id();
         const NetworkAddressPB& host = dest_state->instance_states[i].host;
-        *dest->mutable_thrift_backend() = host;
+        *dest->mutable_address() = host;
         const BackendDescriptorPB& desc = LookUpBackendDesc(executor_config, 
host);
         DCHECK(desc.has_krpc_address());
         DCHECK(IsResolvedAddress(desc.krpc_address()));
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 0edee2c..15abdf1 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -36,7 +36,7 @@ from subprocess import call, check_call
 from testdata.common import cgroups
 from tests.common.environ import build_flavor_timeout
 from tests.common.impala_cluster import (ImpalaCluster, DEFAULT_BEESWAX_PORT,
-    DEFAULT_HS2_PORT, DEFAULT_BE_PORT, DEFAULT_KRPC_PORT, 
DEFAULT_HS2_HTTP_PORT,
+    DEFAULT_HS2_PORT, DEFAULT_KRPC_PORT, DEFAULT_HS2_HTTP_PORT,
     DEFAULT_STATE_STORE_SUBSCRIBER_PORT, DEFAULT_IMPALAD_WEBSERVER_PORT,
     DEFAULT_STATESTORED_WEBSERVER_PORT, DEFAULT_CATALOGD_WEBSERVER_PORT,
     DEFAULT_CATALOGD_JVM_DEBUG_PORT, DEFAULT_IMPALAD_JVM_DEBUG_PORT,
@@ -221,7 +221,6 @@ def choose_impalad_ports(instance_num):
   return {'beeswax_port': DEFAULT_BEESWAX_PORT + instance_num,
           'hs2_port': DEFAULT_HS2_PORT + instance_num,
           'hs2_http_port': DEFAULT_HS2_HTTP_PORT + instance_num,
-          'be_port': DEFAULT_BE_PORT + instance_num,
           'krpc_port': DEFAULT_KRPC_PORT + instance_num,
           'state_store_subscriber_port':
               DEFAULT_STATE_STORE_SUBSCRIBER_PORT + instance_num,
@@ -233,7 +232,6 @@ def build_impalad_port_args(instance_num):
       "-beeswax_port={beeswax_port} "
       "-hs2_port={hs2_port} "
       "-hs2_http_port={hs2_http_port} "
-      "-be_port={be_port} "
       "-krpc_port={krpc_port} "
       "-state_store_subscriber_port={state_store_subscriber_port} "
       "-webserver_port={webserver_port}")
diff --git a/common/protobuf/admission_control_service.proto 
b/common/protobuf/admission_control_service.proto
index 51d88f2..6b59af0 100644
--- a/common/protobuf/admission_control_service.proto
+++ b/common/protobuf/admission_control_service.proto
@@ -58,10 +58,10 @@ message BackendExecParamsPB {
   // The id of this backend.
   optional UniqueIdPB backend_id = 1;
 
-  // The hostname + port of the Thrift ImpalaInternalService on this backend.
+  // The hostname + port of the KRPC backend service on this backend.
   optional NetworkAddressPB address = 8;
 
-  // The IP address + port of the KRPC ControlService on this backend.
+  // The IP address + port of the KRPC backend service on this backend.
   optional NetworkAddressPB krpc_address = 9;
 
   // The fragment instance params assigned to this backend. All instances of a
diff --git a/common/protobuf/control_service.proto 
b/common/protobuf/control_service.proto
index eb0b66a..d8483ff 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -291,10 +291,10 @@ message PlanFragmentDestinationPB {
   // The globally unique fragment instance id.
   optional UniqueIdPB fragment_instance_id = 1;
 
-  // Hostname + port of the Thrift based ImpalaInteralService on the 
destination.
-  optional NetworkAddressPB thrift_backend = 2;
+  // Hostname + port of the KRPC backend service on the destination.
+  optional NetworkAddressPB address = 2;
 
-  // IP address + port of the KRPC based ImpalaInternalService on the 
destination.
+  // IP address + port of the KRPC backend service on the destination.
   optional NetworkAddressPB krpc_backend = 3;
 }
 
diff --git a/common/protobuf/statestore_service.proto 
b/common/protobuf/statestore_service.proto
index ff9f7d9..8938934 100644
--- a/common/protobuf/statestore_service.proto
+++ b/common/protobuf/statestore_service.proto
@@ -40,7 +40,7 @@ message BackendDescriptorPB {
   // Unique identifier for this impalad. Generated on startup.
   optional UniqueIdPB backend_id = 1;
 
-  // Network address of the thrift based ImpalaInternalService on this backend
+  // Hostname + port of the KRPC backend service on this backend.
   optional NetworkAddressPB address = 2;
 
   // IP address corresponding to address.hostname. Explicitly including this 
saves the
@@ -59,7 +59,7 @@ message BackendDescriptorPB {
   // True if the debug webserver is secured (for correctly generating links)
   optional bool secure_webserver = 7;
 
-  // IP address + port of KRPC based ImpalaInternalService on this backend
+  // IP address + port of the KRPC backend service on this backend.
   optional NetworkAddressPB krpc_address = 8;
 
   // The amount of memory that can be admitted to this backend (in bytes).
diff --git a/infra/deploy/deploy.py b/infra/deploy/deploy.py
index aa7db01..9fcfb28 100644
--- a/infra/deploy/deploy.py
+++ b/infra/deploy/deploy.py
@@ -360,7 +360,6 @@ def transform_rcg_config(rcg):
         new_config.update(transform_path(rcg.name, full, "log_dir"))
         new_config.update(transform_path(rcg.name, full, "scratch_dirs"))
 
-        new_config.update(transform_port(rcg.name, full, "be_port"))
         new_config.update(transform_port(rcg.name, full, "beeswax_port"))
         new_config.update(transform_port(rcg.name, full, "hs2_port"))
         new_config.update(transform_port(rcg.name, full, 
"impalad_webserver_port"))
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index b836b43..35face8 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -51,7 +51,6 @@ START_DAEMON_PATH = os.path.join(IMPALA_HOME, 
'bin/start-daemon.sh')
 DEFAULT_BEESWAX_PORT = 21000
 DEFAULT_HS2_PORT = 21050
 DEFAULT_HS2_HTTP_PORT = 28000
-DEFAULT_BE_PORT = 22000
 DEFAULT_KRPC_PORT = 27000
 DEFAULT_CATALOG_SERVICE_PORT = 26000
 DEFAULT_STATE_STORE_SUBSCRIBER_PORT = 23000
@@ -456,7 +455,7 @@ class ImpaladProcess(BaseImpalaProcess):
   def __init__(self, cmd, container_id=None, port_map=None):
     super(ImpaladProcess, self).__init__(cmd, container_id, port_map)
     self.service = ImpaladService(self.hostname, self.webserver_interface,
-        self.get_webserver_port(), self.__get_beeswax_port(), 
self.__get_be_port(),
+        self.get_webserver_port(), self.__get_beeswax_port(),
         self.__get_krpc_port(), self.__get_hs2_port(), 
self.__get_hs2_http_port(),
         self._get_webserver_certificate_file())
 
@@ -466,9 +465,6 @@ class ImpaladProcess(BaseImpalaProcess):
   def __get_beeswax_port(self):
     return int(self._get_port('beeswax_port', DEFAULT_BEESWAX_PORT))
 
-  def __get_be_port(self):
-    return int(self._get_port('be_port', DEFAULT_BE_PORT))
-
   def __get_krpc_port(self):
     return int(self._get_port('krpc_port', DEFAULT_KRPC_PORT))
 
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index fe6b0e6..3d4a507 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -170,12 +170,11 @@ class BaseImpalaService(object):
 # new connections or accessing the debug webpage.
 class ImpaladService(BaseImpalaService):
   def __init__(self, hostname, webserver_interface="", webserver_port=25000,
-      beeswax_port=21000, be_port=22000, krpc_port=27000, hs2_port=21050,
+      beeswax_port=21000, krpc_port=27000, hs2_port=21050,
       hs2_http_port=28000, webserver_certificate_file=""):
     super(ImpaladService, self).__init__(
         hostname, webserver_interface, webserver_port, 
webserver_certificate_file)
     self.beeswax_port = beeswax_port
-    self.be_port = be_port
     self.krpc_port = krpc_port
     self.hs2_port = hs2_port
     self.hs2_http_port = hs2_http_port
diff --git a/tests/custom_cluster/test_query_retries.py 
b/tests/custom_cluster/test_query_retries.py
index 45caa25..38e17f0 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -950,13 +950,13 @@ class TestQueryRetries(CustomClusterTestSuite):
     """Validate that the given profile indicates that the given impalad was not
     blacklisted during retried query execution"""
     assert not ("Blacklisted Executors: {0}:{1}".format(impalad.hostname,
-        impalad.service.be_port) in profile), profile
+        impalad.service.krpc_port) in profile), profile
 
   def __assert_executors_not_assigned_any_finstance(self, impalad, profile):
     """Validate that the given profile indicates that the given impalad was not
     assigned any fragment instance for query execution"""
     assert not ("host={0}:{1}".format(impalad.hostname,
-        impalad.service.be_port) in profile), profile
+        impalad.service.krpc_port) in profile), profile
 
   def __validate_client_log(self, handle, retried_query_id, 
use_hs2_client=False):
     """Validate the GetLog result contains query retry information"""
diff --git a/tests/shell/test_shell_interactive.py 
b/tests/shell/test_shell_interactive.py
index 84cd42d..320208e 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -398,7 +398,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
       hostname = socket.getfqdn()
       initial_impala_service = ImpaladService(hostname)
       target_impala_service = ImpaladService(hostname, webserver_port=25001,
-          beeswax_port=21001, be_port=22001, hs2_port=21051, 
hs2_http_port=28001)
+          beeswax_port=21001, hs2_port=21051, hs2_http_port=28001)
       protocol = vector.get_value("protocol").lower()
       if protocol == "hs2":
         target_port = 21051

Reply via email to