This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ca17e307ab3abb2c95c27b3ba749adf6bf16efc7
Author: John Sherman <[email protected]>
AuthorDate: Thu Apr 2 22:46:17 2020 +0000

    IMPALA-10550: Add External Frontend service port
    
    - If external_fe_port flag is >0, spins up a new HS2 compatible
      service port
    - Added enable_external_fe_support option to start-impala-cluster.py
      - which when detected will start impala clusters with
      external_fe_port on 21150-21152
    - Modify impalad_coordinator Dockerfile to expose external frontend
      port at 21150
    - The intent of this commit is to separate external frontend
      connections from normal hs2 connections
      - This allows different security policy to be applied to
      each type of connection. The external_fe_port should be considered
      a privileged service and should only be exposed to an external
      frontend that does user authentication and does authorization
      checks on generated plans
    
    Change-Id: I991b5b05e12e37d8739e18ed1086bbb0228acc40
    Reviewed-by: Aman Sinha <[email protected]>
    Reviewed-on: http://gerrit.cloudera.org:8080/17125
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/rpc/authentication.cc          | 12 +++++++
 be/src/rpc/authentication.h           |  7 ++++
 be/src/service/impala-server.cc       | 38 ++++++++++++++++++++--
 be/src/service/impala-server.h        |  7 +++-
 be/src/service/impalad-main.cc        |  5 +--
 be/src/testutil/in-process-servers.cc |  2 +-
 bin/start-impala-cluster.py           | 12 +++++--
 common/thrift/metrics.json            | 60 +++++++++++++++++++++++++++++++++++
 docker/impalad_coordinator/Dockerfile |  2 ++
 tests/common/impala_cluster.py        |  1 +
 10 files changed, 137 insertions(+), 9 deletions(-)

diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index 47bcebb..9931e2f 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -87,6 +87,7 @@ DECLARE_string(be_principal);
 DECLARE_string(krb5_ccname);
 DECLARE_string(krb5_conf);
 DECLARE_string(krb5_debug_file);
+DECLARE_int32(external_fe_port);
 
 // Defined in kudu/security/init.cc
 DECLARE_bool(use_system_auth_to_local);
@@ -1399,6 +1400,12 @@ Status AuthManager::Init() {
   }
   RETURN_IF_ERROR(internal_auth_provider_->Start());
   RETURN_IF_ERROR(external_auth_provider_->Start());
+  if (FLAGS_external_fe_port > 0 ||
+      (TestInfo::is_test() && FLAGS_external_fe_port == 0)) {
+    external_fe_auth_provider_.reset(new NoAuthProvider());
+    LOG(INFO) << "External Frontend communication is not authenticated";
+    RETURN_IF_ERROR(external_fe_auth_provider_->Start());
+  }
   return Status::OK();
 }
 
@@ -1407,6 +1414,11 @@ AuthProvider* AuthManager::GetExternalAuthProvider() {
   return external_auth_provider_.get();
 }
 
+AuthProvider* AuthManager::GetExternalFrontendAuthProvider() {
+  DCHECK(external_fe_auth_provider_.get() != NULL);
+  return external_fe_auth_provider_.get();
+}
+
 AuthProvider* AuthManager::GetInternalAuthProvider() {
   DCHECK(internal_auth_provider_.get() != NULL);
   return internal_auth_provider_.get();
diff --git a/be/src/rpc/authentication.h b/be/src/rpc/authentication.h
index cfd28c1..313d5d0 100644
--- a/be/src/rpc/authentication.h
+++ b/be/src/rpc/authentication.h
@@ -67,6 +67,12 @@ class AuthManager {
   /// internal process.
   AuthProvider* GetExternalAuthProvider();
 
+  /// Returns the authentication provider to use for "external frontend" 
communication.
+  /// This only applies to the server side of a connection; the client side of 
said
+  /// connection is never an internal process. Currently this is either null if
+  /// external_fe_port <= 0 or NoAuthProvider.
+  AuthProvider* GetExternalFrontendAuthProvider();
+
   /// Returns the authentication provider to use for internal daemon <-> daemon
   /// connections.  This goes for both the client and server sides.  An example
   /// connection this applies to would be backend <-> statestore.
@@ -89,6 +95,7 @@ class AuthManager {
   boost::scoped_ptr<AuthProvider> internal_auth_provider_;
   boost::scoped_ptr<AuthProvider> external_auth_provider_;
   boost::scoped_ptr<AuthProvider> external_http_auth_provider_;
+  boost::scoped_ptr<AuthProvider> external_fe_auth_provider_;
 
   /// Used to authenticate usernames and passwords to LDAP.
   std::unique_ptr<ImpalaLdap> ldap_;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 09e6dc2..0e8bfe2 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -147,6 +147,10 @@ DEFINE_int32(hs2_port, 21050, "port on which HiveServer2 
client requests are ser
     "If 0 or less, the HiveServer2 server is not started.");
 DEFINE_int32(hs2_http_port, 28000, "port on which HiveServer2 HTTP(s) client "
     "requests are served. If 0 or less, the HiveServer2 http server is not 
started.");
+DEFINE_int32(external_fe_port, 0, "port on which External Frontend requests 
are served. "
+    "If 0 or less, the External Frontend server is not started. Careful 
consideration "
+    "must be taken when enabling due to the fact that this port is currently 
always "
+    "unauthenticated.");
 
 DEFINE_int32(fe_service_threads, 64,
     "number of threads available to serve client requests");
@@ -2341,7 +2345,8 @@ void ImpalaServer::ConnectionEnd(
     }
   } else {
     DCHECK(connection_context.server_name ==  HS2_SERVER_NAME
-        || connection_context.server_name == HS2_HTTP_SERVER_NAME);
+        || connection_context.server_name == HS2_HTTP_SERVER_NAME
+        || connection_context.server_name == EXTERNAL_FRONTEND_SERVER_NAME);
     for (const TUniqueId& session_id : disconnected_sessions) {
       shared_ptr<SessionState> state;
       Status status = GetSessionState(session_id, 
SecretArg::SkipSecretCheck(), &state);
@@ -2703,8 +2708,8 @@ void ImpalaServer::ExpireQuery(ClientRequestState* crs, 
const Status& status) {
   crs->set_expired();
 }
 
-Status ImpalaServer::Start(
-    int32_t beeswax_port, int32_t hs2_port, int32_t hs2_http_port) {
+Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
+    int32_t hs2_http_port, int32_t external_fe_port) {
   exec_env_->SetImpalaServer(this);
 
   // We must register the HTTP handlers after registering the ImpalaServer 
with the
@@ -2801,6 +2806,28 @@ Status ImpalaServer::Start(
       hs2_server_->SetConnectionHandler(this);
     }
 
+    if (external_fe_port > 0 || (TestInfo::is_test() && external_fe_port == 
0)) {
+      boost::shared_ptr<TProcessor> external_fe_processor(
+          new ImpalaHiveServer2ServiceProcessor(handler));
+      boost::shared_ptr<TProcessorEventHandler> event_handler(
+          new RpcEventHandler("external_frontend", exec_env_->metrics()));
+      external_fe_processor->setEventHandler(event_handler);
+
+      ThriftServerBuilder builder(EXTERNAL_FRONTEND_SERVER_NAME, 
external_fe_processor,
+          external_fe_port);
+      ThriftServer* server;
+      RETURN_IF_ERROR(
+          builder.auth_provider(
+              AuthManager::GetInstance()->GetExternalFrontendAuthProvider())
+          .metrics(exec_env_->metrics())
+          .max_concurrent_connections(FLAGS_fe_service_threads)
+          .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
+          .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * 
MILLIS_PER_SEC)
+          .Build(&server));
+      external_fe_server_.reset(server);
+      external_fe_server_->SetConnectionHandler(this);
+    }
+
     if (hs2_http_port > 0 || (TestInfo::is_test() && hs2_http_port == 0)) {
       boost::shared_ptr<TProcessor> hs2_http_processor(
           new ImpalaHiveServer2ServiceProcessor(handler));
@@ -2846,6 +2873,11 @@ Status ImpalaServer::Start(
     LOG(INFO) << "Impala HiveServer2 Service (HTTP) listening on "
               << hs2_http_server_->port();
   }
+  if (external_fe_server_.get()) {
+    RETURN_IF_ERROR(external_fe_server_->Start());
+    LOG(INFO) << "Impala External Frontend Service listening on "
+              << external_fe_server_->port();
+  }
   if (beeswax_server_.get()) {
     RETURN_IF_ERROR(beeswax_server_->Start());
     LOG(INFO) << "Impala Beeswax Service listening on " << 
beeswax_server_->port();
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 4971dfe..f280374 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -209,7 +209,8 @@ class ImpalaServer : public ImpalaServiceIf,
   /// ephemeral port in tests and to not start the service in a daemon. A port 
< 0
   /// always means to not start the service. The port values can be obtained 
after
   /// Start() by calling GetBeeswaxPort() or GetHS2Port().
-  Status Start(int32_t beeswax_port, int32_t hs2_port, int32_t hs2_http_port);
+  Status Start(int32_t beeswax_port, int32_t hs2_port, int32_t hs2_http_port,
+      int32_t external_fe_port);
 
   /// Blocks until the server shuts down.
   void Join();
@@ -632,6 +633,9 @@ class ImpalaServer : public ImpalaServiceIf,
   friend class ImpalaServerTest;
   friend class QueryDriver;
 
+  // Used to identify external frontend RPC calls
+  const string EXTERNAL_FRONTEND_SERVER_NAME = "external-frontend";
+
   boost::scoped_ptr<ImpalaHttpHandler> http_handler_;
 
   /// Relevant ODBC SQL State code; for more info,
@@ -1570,6 +1574,7 @@ class ImpalaServer : public ImpalaServiceIf,
   boost::scoped_ptr<ThriftServer> beeswax_server_;
   boost::scoped_ptr<ThriftServer> hs2_server_;
   boost::scoped_ptr<ThriftServer> hs2_http_server_;
+  boost::scoped_ptr<ThriftServer> external_fe_server_;
 
   /// Flag that records if backend and/or client services have been started. 
The flag is
   /// set after all services required for the server have been started.
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 9704067..76a8157 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -53,6 +53,7 @@ using namespace impala;
 DECLARE_int32(beeswax_port);
 DECLARE_int32(hs2_port);
 DECLARE_int32(hs2_http_port);
+DECLARE_int32(external_fe_port);
 DECLARE_bool(is_coordinator);
 
 int ImpaladMain(int argc, char** argv) {
@@ -83,8 +84,8 @@ int ImpaladMain(int argc, char** argv) {
   InitRpcEventTracing(exec_env.webserver(), exec_env.rpc_mgr());
 
   boost::shared_ptr<ImpalaServer> impala_server(new ImpalaServer(&exec_env));
-  Status status =
-      impala_server->Start(FLAGS_beeswax_port, FLAGS_hs2_port, 
FLAGS_hs2_http_port);
+  Status status = impala_server->Start(FLAGS_beeswax_port, FLAGS_hs2_port,
+      FLAGS_hs2_http_port, FLAGS_external_fe_port);
   if (!status.ok()) {
     LOG(ERROR) << "Impalad services did not start correctly, exiting.  Error: "
         << status.GetDetail();
diff --git a/be/src/testutil/in-process-servers.cc 
b/be/src/testutil/in-process-servers.cc
index 337932d..7e72ff4 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -80,7 +80,7 @@ Status InProcessImpalaServer::StartWithClientServers(
 
   impala_server_.reset(new ImpalaServer(exec_env_.get()));
   SetCatalogIsReady();
-  RETURN_IF_ERROR(impala_server_->Start(beeswax_port, hs2_port, 
hs2_http_port));
+  RETURN_IF_ERROR(impala_server_->Start(beeswax_port, hs2_port, 
hs2_http_port_, 0));
 
   // Wait for up to 1s for the backend server to start
   RETURN_IF_ERROR(WaitForServer(FLAGS_hostname, krpc_port_, 10, 100));
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 7767730..99d901f 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -40,7 +40,8 @@ from tests.common.impala_cluster import (ImpalaCluster, 
DEFAULT_BEESWAX_PORT,
     DEFAULT_STATE_STORE_SUBSCRIBER_PORT, DEFAULT_IMPALAD_WEBSERVER_PORT,
     DEFAULT_STATESTORED_WEBSERVER_PORT, DEFAULT_CATALOGD_WEBSERVER_PORT,
     DEFAULT_ADMISSIOND_WEBSERVER_PORT, DEFAULT_CATALOGD_JVM_DEBUG_PORT,
-    DEFAULT_IMPALAD_JVM_DEBUG_PORT, find_user_processes, run_daemon)
+    DEFAULT_EXTERNAL_FE_PORT, DEFAULT_IMPALAD_JVM_DEBUG_PORT,
+    find_user_processes, run_daemon)
 
 LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
 LOG.setLevel(level=logging.DEBUG)
@@ -137,6 +138,9 @@ parser.add_option("--enable_admission_service", 
dest="enable_admission_service",
                   help="If true, enables the Admissison Control Service - the 
cluster "
                   "will be launched with an admissiond and all coordinators 
configured "
                   "to use it for admission control.")
+parser.add_option("--enable_external_fe_support", 
dest="enable_external_fe_support",
+                  action="store_true", default=False,
+                  help="If true, impalads will start with the external_fe_port 
defined.")
 
 # For testing: list of comma-separated delays, in milliseconds, that delay 
impalad catalog
 # replica initialization. The ith delay is applied to the ith impalad.
@@ -231,6 +235,7 @@ def choose_impalad_ports(instance_num):
           'hs2_port': DEFAULT_HS2_PORT + instance_num,
           'hs2_http_port': DEFAULT_HS2_HTTP_PORT + instance_num,
           'krpc_port': DEFAULT_KRPC_PORT + instance_num,
+          'external_fe_port': DEFAULT_EXTERNAL_FE_PORT + instance_num,
           'state_store_subscriber_port':
               DEFAULT_STATE_STORE_SUBSCRIBER_PORT + instance_num,
           'webserver_port': DEFAULT_IMPALAD_WEBSERVER_PORT + instance_num}
@@ -244,6 +249,8 @@ def build_impalad_port_args(instance_num):
       "-krpc_port={krpc_port} "
       "-state_store_subscriber_port={state_store_subscriber_port} "
       "-webserver_port={webserver_port}")
+  if options.enable_external_fe_support:
+    IMPALAD_PORTS += " -external_fe_port={external_fe_port}"
   return IMPALAD_PORTS.format(**choose_impalad_ports(instance_num))
 
 
@@ -613,7 +620,8 @@ class DockerMiniClusterOperations(object):
       port_map = {DEFAULT_BEESWAX_PORT: chosen_ports['beeswax_port'],
                   DEFAULT_HS2_PORT: chosen_ports['hs2_port'],
                   DEFAULT_HS2_HTTP_PORT: chosen_ports['hs2_http_port'],
-                  DEFAULT_IMPALAD_WEBSERVER_PORT: 
chosen_ports['webserver_port']}
+                  DEFAULT_IMPALAD_WEBSERVER_PORT: 
chosen_ports['webserver_port'],
+                  DEFAULT_EXTERNAL_FE_PORT: chosen_ports['external_fe_port']}
       self.__run_container__("impalad_coord_exec", impalad_arg_lists[i], 
port_map, i,
           mem_limit=mem_limit, supports_data_cache=True)
 
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 3b6d3f1..3ac2d8b 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1182,6 +1182,66 @@
     "key": "impala.thrift-server.hiveserver2-frontend.timedout-cnxn-requests"
   },
   {
+    "description": "The number of active External Frontend API connections to 
this Impala Daemon.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "External Frontend API Active Connections",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.external-frontend.connections-in-use"
+  },
+  {
+    "description": "Amount of time clients of External Frontend API spent 
waiting for connection to be set up",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "External Frontend API client connection setup time",
+    "units": "TIME_US",
+    "kind": "HISTOGRAM",
+    "key": "impala.thrift-server.external-frontend.connection-setup-time"
+  },
+  {
+    "description": "Amount of time clients of External Frontend API spent 
waiting for service threads",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "External Frontend API clients' wait time for service threads",
+    "units": "TIME_US",
+    "kind": "HISTOGRAM",
+    "key": "impala.thrift-server.external-frontend.svc-thread-wait-time"
+  },
+  {
+    "description": "The total number of External Frontend API connections made 
to this Impala Daemon over its lifetime.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "External Frontend API Total Connections",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "impala.thrift-server.external-frontend.total-connections"
+  },
+  {
+    "description": "The number of External Frontend API connections to this 
Impala Daemon that have been accepted and are waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "External Frontend API Connections Queued for Setup",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.external-frontend.connection-setup-queue-size"
+  },
+  {
+    "description": "The number of External Frontend API connection requests to 
this Impala Daemon that have been timed out waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "External Frontend API Connection Requests Timed Out",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.external-frontend.timedout-cnxn-requests"
+  },
+  {
     "description": "The number of active HiveServer2 HTTP API connections to 
this Impala Daemon.",
     "contexts": [
       "IMPALAD"
diff --git a/docker/impalad_coordinator/Dockerfile 
b/docker/impalad_coordinator/Dockerfile
index 3a7b445..bf6de31 100644
--- a/docker/impalad_coordinator/Dockerfile
+++ b/docker/impalad_coordinator/Dockerfile
@@ -29,6 +29,8 @@ EXPOSE 21050
 EXPOSE 28000
 # Debug webserver
 EXPOSE 25000
+# External Frontend
+EXPOSE 21150
 
 ENTRYPOINT ["/opt/impala/bin/daemon_entrypoint.sh", "/opt/impala/bin/impalad",\
      "-log_dir=/opt/impala/logs",\
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 305b461..89167b7 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -51,6 +51,7 @@ START_DAEMON_PATH = os.path.join(IMPALA_HOME, 
'bin/start-daemon.sh')
 
 DEFAULT_BEESWAX_PORT = 21000
 DEFAULT_HS2_PORT = 21050
+DEFAULT_EXTERNAL_FE_PORT = 21150
 DEFAULT_HS2_HTTP_PORT = 28000
 DEFAULT_KRPC_PORT = 27000
 DEFAULT_CATALOG_SERVICE_PORT = 26000

Reply via email to