This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push: new 1af60a1 IMPALA-9180 (part 3): Remove legacy backend port 1af60a1 is described below commit 1af60a15605463ab4ba00d5326d130d0a3165821 Author: wzhou-code <wz...@cloudera.com> AuthorDate: Thu Oct 1 13:47:44 2020 -0700 IMPALA-9180 (part 3): Remove legacy backend port The legacy Thrift based Impala internal service has been removed so the backend port 22000 can be freed up. This patch set flag be_port as a REMOVED_FLAG and all infrastructures around it are cleaned up. StatestoreSubscriber::subscriber_id is set as hostname + krpc_port. Testing: - Passed the exhaustive test. Change-Id: Ic6909a8da449b4d25ee98037b3eb459af4850dc6 Reviewed-on: http://gerrit.cloudera.org:8080/16533 Reviewed-by: Thomas Tauber-Marshall <tmarsh...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/common/global-flags.cc | 3 +-- be/src/runtime/coordinator-backend-state.h | 4 ++-- be/src/runtime/data-stream-test.cc | 4 ++-- be/src/runtime/exec-env.cc | 9 ++++----- be/src/runtime/krpc-data-stream-sender.cc | 6 +++--- be/src/scheduling/schedule-state.h | 4 ++-- be/src/scheduling/scheduler.cc | 2 +- bin/start-impala-cluster.py | 4 +--- common/protobuf/admission_control_service.proto | 4 ++-- common/protobuf/control_service.proto | 6 +++--- common/protobuf/statestore_service.proto | 4 ++-- infra/deploy/deploy.py | 1 - tests/common/impala_cluster.py | 6 +----- tests/common/impala_service.py | 3 +-- tests/custom_cluster/test_query_retries.py | 4 ++-- tests/shell/test_shell_interactive.py | 2 +- 16 files changed, 28 insertions(+), 38 deletions(-) diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc index 3025be9..339ff14 100644 --- a/be/src/common/global-flags.cc +++ b/be/src/common/global-flags.cc @@ -38,8 +38,6 @@ DEFINE_string(hostname, "", "Hostname to use for this daemon, also used as part DEFINE_bool(use_resolved_hostname, false, "If true, --hostname is resolved before use, " "so that the IP address will be used everywhere instead of the hostname."); -DEFINE_int32(be_port, 22000, - "port on which thrift based ImpalaInternalService is exported"); DEFINE_int32(krpc_port, 27000, "port on which KRPC based ImpalaInternalService is exported"); @@ -379,6 +377,7 @@ REMOVED_FLAG(abfs_read_chunk_size); REMOVED_FLAG(adls_read_chunk_size); REMOVED_FLAG(authorization_policy_file); REMOVED_FLAG(authorization_policy_provider_class); +REMOVED_FLAG(be_port); REMOVED_FLAG(be_service_threads); REMOVED_FLAG(cgroup_hierarchy_path); REMOVED_FLAG(coordinator_rpc_threads); diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h index e9b300e..9797b13 100644 --- a/be/src/runtime/coordinator-backend-state.h +++ b/be/src/runtime/coordinator-backend-state.h @@ -336,9 +336,9 @@ class Coordinator::BackendState { /// Owned by coordinator object pool provided in the c'tor, created in Update(). RuntimeProfile* host_profile_ = nullptr; - /// Thrift address of execution backend. + /// Address of execution backend: hostname + krpc_port. NetworkAddressPB host_; - /// Krpc address of execution backend. + /// Krpc address of execution backend: ip_address + krpc_port. NetworkAddressPB krpc_host_; /// The query context of the Coordinator that owns this BackendState. diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc index 06b9dd6..d7f94a9 100644 --- a/be/src/runtime/data-stream-test.cc +++ b/be/src/runtime/data-stream-test.cc @@ -74,7 +74,7 @@ using kudu::rpc::ResultTracker; using kudu::rpc::RpcContext; using kudu::rpc::ServiceIf; -DEFINE_int32(port, 20001, "port on which to run Impala Thrift based test backend."); +DEFINE_int32(port, 20001, "port on which to run Impala krpc based test backend."); DECLARE_int32(datastream_sender_timeout_ms); DECLARE_int32(datastream_service_num_deserialization_threads); DECLARE_int32(datastream_service_deserialization_queue_size); @@ -320,7 +320,7 @@ class DataStreamTest : public testing::Test { void GetNextInstanceId(TUniqueId* instance_id) { PlanFragmentDestinationPB* dest = dest_.Add(); *dest->mutable_fragment_instance_id() = next_instance_id_; - *dest->mutable_thrift_backend() = MakeNetworkAddressPB("localhost", FLAGS_port); + *dest->mutable_address() = MakeNetworkAddressPB("localhost", FLAGS_port); *dest->mutable_krpc_backend() = FromTNetworkAddress(krpc_address_); UniqueIdPBToTUniqueId(next_instance_id_, instance_id); next_instance_id_.set_lo(next_instance_id_.lo() + 1); diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index fe2bfd8..0dd78bc 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -124,7 +124,6 @@ DEFINE_int32_hidden(local_catalog_max_fetch_retries, 40, DECLARE_int32(state_store_port); DECLARE_int32(num_threads_per_core); DECLARE_int32(num_cores); -DECLARE_int32(be_port); DECLARE_int32(krpc_port); DECLARE_string(mem_limit); DECLARE_bool(mem_limit_includes_jvm); @@ -274,10 +273,10 @@ ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int webserver_port, TNetworkAddress statestore_address = MakeNetworkAddress(statestore_host, statestore_port); - // Set StatestoreSubscriber::subscriber_id as hostname + be_port. - statestore_subscriber_.reset( - new StatestoreSubscriber(Substitute("impalad@$0:$1", FLAGS_hostname, FLAGS_be_port), - subscriber_address, statestore_address, metrics_.get())); + // Set StatestoreSubscriber::subscriber_id as hostname + krpc_port. + statestore_subscriber_.reset(new StatestoreSubscriber( + Substitute("impalad@$0:$1", FLAGS_hostname, FLAGS_krpc_port), subscriber_address, + statestore_address, metrics_.get())); if (FLAGS_is_coordinator) { hdfs_op_thread_pool_.reset( diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc index 42bcec9..42d0435 100644 --- a/be/src/runtime/krpc-data-stream-sender.cc +++ b/be/src/runtime/krpc-data-stream-sender.cc @@ -732,9 +732,9 @@ KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id, || sink.output_partition.type == TPartitionType::KUDU); for (const auto& destination : destinations) { - channels_.emplace_back(new Channel(this, row_desc_, - destination.thrift_backend().hostname(), destination.krpc_backend(), - destination.fragment_instance_id(), sink.dest_node_id, per_channel_buffer_size)); + channels_.emplace_back(new Channel(this, row_desc_, destination.address().hostname(), + destination.krpc_backend(), destination.fragment_instance_id(), sink.dest_node_id, + per_channel_buffer_size)); } if (partition_type_ == TPartitionType::UNPARTITIONED diff --git a/be/src/scheduling/schedule-state.h b/be/src/scheduling/schedule-state.h index a011d94..39181cb 100644 --- a/be/src/scheduling/schedule-state.h +++ b/be/src/scheduling/schedule-state.h @@ -75,10 +75,10 @@ typedef std::unordered_map<NetworkAddressPB, BackendScheduleState> /// transferred to the corresponding BackendExecParamsPB in /// Scheduler::ComputeBackendExecParams(). struct FInstanceScheduleState { - /// Thrift address of execution backend. + /// Address of execution backend: hostname + krpc_port. NetworkAddressPB host; - /// Krpc address of execution backend. + /// Krpc address of execution backend: ip_address + krpc_port. NetworkAddressPB krpc_host; /// Contains any info that needs to be sent back to the coordinator. Computed during diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index 850c086..1034fce 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -229,7 +229,7 @@ void Scheduler::ComputeFragmentExecParams( *dest->mutable_fragment_instance_id() = dest_state->instance_states[i].exec_params.instance_id(); const NetworkAddressPB& host = dest_state->instance_states[i].host; - *dest->mutable_thrift_backend() = host; + *dest->mutable_address() = host; const BackendDescriptorPB& desc = LookUpBackendDesc(executor_config, host); DCHECK(desc.has_krpc_address()); DCHECK(IsResolvedAddress(desc.krpc_address())); diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py index 0edee2c..15abdf1 100755 --- a/bin/start-impala-cluster.py +++ b/bin/start-impala-cluster.py @@ -36,7 +36,7 @@ from subprocess import call, check_call from testdata.common import cgroups from tests.common.environ import build_flavor_timeout from tests.common.impala_cluster import (ImpalaCluster, DEFAULT_BEESWAX_PORT, - DEFAULT_HS2_PORT, DEFAULT_BE_PORT, DEFAULT_KRPC_PORT, DEFAULT_HS2_HTTP_PORT, + DEFAULT_HS2_PORT, DEFAULT_KRPC_PORT, DEFAULT_HS2_HTTP_PORT, DEFAULT_STATE_STORE_SUBSCRIBER_PORT, DEFAULT_IMPALAD_WEBSERVER_PORT, DEFAULT_STATESTORED_WEBSERVER_PORT, DEFAULT_CATALOGD_WEBSERVER_PORT, DEFAULT_CATALOGD_JVM_DEBUG_PORT, DEFAULT_IMPALAD_JVM_DEBUG_PORT, @@ -221,7 +221,6 @@ def choose_impalad_ports(instance_num): return {'beeswax_port': DEFAULT_BEESWAX_PORT + instance_num, 'hs2_port': DEFAULT_HS2_PORT + instance_num, 'hs2_http_port': DEFAULT_HS2_HTTP_PORT + instance_num, - 'be_port': DEFAULT_BE_PORT + instance_num, 'krpc_port': DEFAULT_KRPC_PORT + instance_num, 'state_store_subscriber_port': DEFAULT_STATE_STORE_SUBSCRIBER_PORT + instance_num, @@ -233,7 +232,6 @@ def build_impalad_port_args(instance_num): "-beeswax_port={beeswax_port} " "-hs2_port={hs2_port} " "-hs2_http_port={hs2_http_port} " - "-be_port={be_port} " "-krpc_port={krpc_port} " "-state_store_subscriber_port={state_store_subscriber_port} " "-webserver_port={webserver_port}") diff --git a/common/protobuf/admission_control_service.proto b/common/protobuf/admission_control_service.proto index 51d88f2..6b59af0 100644 --- a/common/protobuf/admission_control_service.proto +++ b/common/protobuf/admission_control_service.proto @@ -58,10 +58,10 @@ message BackendExecParamsPB { // The id of this backend. optional UniqueIdPB backend_id = 1; - // The hostname + port of the Thrift ImpalaInternalService on this backend. + // The hostname + port of the KRPC backend service on this backend. optional NetworkAddressPB address = 8; - // The IP address + port of the KRPC ControlService on this backend. + // The IP address + port of the KRPC backend service on this backend. optional NetworkAddressPB krpc_address = 9; // The fragment instance params assigned to this backend. All instances of a diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto index eb0b66a..d8483ff 100644 --- a/common/protobuf/control_service.proto +++ b/common/protobuf/control_service.proto @@ -291,10 +291,10 @@ message PlanFragmentDestinationPB { // The globally unique fragment instance id. optional UniqueIdPB fragment_instance_id = 1; - // Hostname + port of the Thrift based ImpalaInteralService on the destination. - optional NetworkAddressPB thrift_backend = 2; + // Hostname + port of the KRPC backend service on the destination. + optional NetworkAddressPB address = 2; - // IP address + port of the KRPC based ImpalaInternalService on the destination. + // IP address + port of the KRPC backend service on the destination. optional NetworkAddressPB krpc_backend = 3; } diff --git a/common/protobuf/statestore_service.proto b/common/protobuf/statestore_service.proto index ff9f7d9..8938934 100644 --- a/common/protobuf/statestore_service.proto +++ b/common/protobuf/statestore_service.proto @@ -40,7 +40,7 @@ message BackendDescriptorPB { // Unique identifier for this impalad. Generated on startup. optional UniqueIdPB backend_id = 1; - // Network address of the thrift based ImpalaInternalService on this backend + // Hostname + port of the KRPC backend service on this backend. optional NetworkAddressPB address = 2; // IP address corresponding to address.hostname. Explicitly including this saves the @@ -59,7 +59,7 @@ message BackendDescriptorPB { // True if the debug webserver is secured (for correctly generating links) optional bool secure_webserver = 7; - // IP address + port of KRPC based ImpalaInternalService on this backend + // IP address + port of the KRPC backend service on this backend. optional NetworkAddressPB krpc_address = 8; // The amount of memory that can be admitted to this backend (in bytes). diff --git a/infra/deploy/deploy.py b/infra/deploy/deploy.py index aa7db01..9fcfb28 100644 --- a/infra/deploy/deploy.py +++ b/infra/deploy/deploy.py @@ -360,7 +360,6 @@ def transform_rcg_config(rcg): new_config.update(transform_path(rcg.name, full, "log_dir")) new_config.update(transform_path(rcg.name, full, "scratch_dirs")) - new_config.update(transform_port(rcg.name, full, "be_port")) new_config.update(transform_port(rcg.name, full, "beeswax_port")) new_config.update(transform_port(rcg.name, full, "hs2_port")) new_config.update(transform_port(rcg.name, full, "impalad_webserver_port")) diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py index b836b43..35face8 100644 --- a/tests/common/impala_cluster.py +++ b/tests/common/impala_cluster.py @@ -51,7 +51,6 @@ START_DAEMON_PATH = os.path.join(IMPALA_HOME, 'bin/start-daemon.sh') DEFAULT_BEESWAX_PORT = 21000 DEFAULT_HS2_PORT = 21050 DEFAULT_HS2_HTTP_PORT = 28000 -DEFAULT_BE_PORT = 22000 DEFAULT_KRPC_PORT = 27000 DEFAULT_CATALOG_SERVICE_PORT = 26000 DEFAULT_STATE_STORE_SUBSCRIBER_PORT = 23000 @@ -456,7 +455,7 @@ class ImpaladProcess(BaseImpalaProcess): def __init__(self, cmd, container_id=None, port_map=None): super(ImpaladProcess, self).__init__(cmd, container_id, port_map) self.service = ImpaladService(self.hostname, self.webserver_interface, - self.get_webserver_port(), self.__get_beeswax_port(), self.__get_be_port(), + self.get_webserver_port(), self.__get_beeswax_port(), self.__get_krpc_port(), self.__get_hs2_port(), self.__get_hs2_http_port(), self._get_webserver_certificate_file()) @@ -466,9 +465,6 @@ class ImpaladProcess(BaseImpalaProcess): def __get_beeswax_port(self): return int(self._get_port('beeswax_port', DEFAULT_BEESWAX_PORT)) - def __get_be_port(self): - return int(self._get_port('be_port', DEFAULT_BE_PORT)) - def __get_krpc_port(self): return int(self._get_port('krpc_port', DEFAULT_KRPC_PORT)) diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py index fe6b0e6..3d4a507 100644 --- a/tests/common/impala_service.py +++ b/tests/common/impala_service.py @@ -170,12 +170,11 @@ class BaseImpalaService(object): # new connections or accessing the debug webpage. class ImpaladService(BaseImpalaService): def __init__(self, hostname, webserver_interface="", webserver_port=25000, - beeswax_port=21000, be_port=22000, krpc_port=27000, hs2_port=21050, + beeswax_port=21000, krpc_port=27000, hs2_port=21050, hs2_http_port=28000, webserver_certificate_file=""): super(ImpaladService, self).__init__( hostname, webserver_interface, webserver_port, webserver_certificate_file) self.beeswax_port = beeswax_port - self.be_port = be_port self.krpc_port = krpc_port self.hs2_port = hs2_port self.hs2_http_port = hs2_http_port diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py index 45caa25..38e17f0 100644 --- a/tests/custom_cluster/test_query_retries.py +++ b/tests/custom_cluster/test_query_retries.py @@ -950,13 +950,13 @@ class TestQueryRetries(CustomClusterTestSuite): """Validate that the given profile indicates that the given impalad was not blacklisted during retried query execution""" assert not ("Blacklisted Executors: {0}:{1}".format(impalad.hostname, - impalad.service.be_port) in profile), profile + impalad.service.krpc_port) in profile), profile def __assert_executors_not_assigned_any_finstance(self, impalad, profile): """Validate that the given profile indicates that the given impalad was not assigned any fragment instance for query execution""" assert not ("host={0}:{1}".format(impalad.hostname, - impalad.service.be_port) in profile), profile + impalad.service.krpc_port) in profile), profile def __validate_client_log(self, handle, retried_query_id, use_hs2_client=False): """Validate the GetLog result contains query retry information""" diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py index 84cd42d..320208e 100755 --- a/tests/shell/test_shell_interactive.py +++ b/tests/shell/test_shell_interactive.py @@ -398,7 +398,7 @@ class TestImpalaShellInteractive(ImpalaTestSuite): hostname = socket.getfqdn() initial_impala_service = ImpaladService(hostname) target_impala_service = ImpaladService(hostname, webserver_port=25001, - beeswax_port=21001, be_port=22001, hs2_port=21051, hs2_http_port=28001) + beeswax_port=21001, hs2_port=21051, hs2_http_port=28001) protocol = vector.get_value("protocol").lower() if protocol == "hs2": target_port = 21051