IMPALA-5162,IMPALA-5163: stress test support on secure clusters

This patch adds support for running the stress test
(concurrent_select.py) and loading nested data (load_nested.py) into a
Kerberized, SSL-enabled Impala cluster. It assumes the calling user
already has a valid Kerberos ticket. One way to do that is:

1. Get access to a keytab and krb5.config
2. Set KRB5_CONFIG and KRB5CCNAME appropriately
3. Run kinit(1)
4. Run load_nested.py and/or concurrent_select.py within this
   environment.

Because our Python clients already support Kerberos and SSL, we simply
need to make sure to use the correct options when calling the entry
points and initializing the clients:

Impala: Impyla
Hive: Impyla
HDFS: hdfs.ext.kerberos.KerberosClient

With this patch, I was able to manually do a short concurrent_select.py
run against a secure cluster without connection or auth errors, and I
was able to do the same with load_nested.py for a cluster that already
had TPC-H loaded.

Follow-ons for future cleanup work:

IMPALA-5263: support CA bundles when running stress test against SSL'd
             Impala

IMPALA-5264: fix InsecurePlatformWarning under stress test with SSL

Change-Id: I0daad57bb8ceeb5071b75125f11c1997ed7e0179
Reviewed-on: http://gerrit.cloudera.org:8080/6763
Reviewed-by: Matthew Mulder <[email protected]>
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/8b459dff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/8b459dff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/8b459dff

Branch: refs/heads/master
Commit: 8b459dffec9e093e87da9ab6e8b2e5a9de50a7bd
Parents: 801c95f
Author: Michael Brown <[email protected]>
Authored: Fri Mar 31 10:39:54 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue May 2 04:56:01 2017 +0000

----------------------------------------------------------------------
 testdata/bin/load_nested.py       |  2 ++
 tests/comparison/cli_options.py   |  9 ++++++
 tests/comparison/cluster.py       | 57 +++++++++++++++++++++++++---------
 tests/comparison/db_connection.py |  9 ++++--
 tests/stress/concurrent_select.py |  2 +-
 5 files changed, 61 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/testdata/bin/load_nested.py
----------------------------------------------------------------------
diff --git a/testdata/bin/load_nested.py b/testdata/bin/load_nested.py
index b44bd04..146c0ff 100755
--- a/testdata/bin/load_nested.py
+++ b/testdata/bin/load_nested.py
@@ -298,6 +298,8 @@ if __name__ == "__main__":
   parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
   cli_options.add_logging_options(parser)
   cli_options.add_cluster_options(parser)  # --cm-host and similar args added 
here
+  cli_options.add_kerberos_options(parser)
+  cli_options.add_ssl_options(parser)
 
   parser.add_argument("-s", "--source-db", default="tpch_parquet")
   parser.add_argument("-t", "--target-db", default="tpch_nested_parquet")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/tests/comparison/cli_options.py
----------------------------------------------------------------------
diff --git a/tests/comparison/cli_options.py b/tests/comparison/cli_options.py
index 714d899..ca2efc6 100644
--- a/tests/comparison/cli_options.py
+++ b/tests/comparison/cli_options.py
@@ -156,6 +156,13 @@ def add_cm_options(parser):
       help='If CM manages multiple clusters, use this to specify which cluster 
to use.')
 
 
+def add_ssl_options(parser):
+  group = parser.add_argument_group('SSL Options')
+  group.add_argument(
+      '--use-ssl', action='store_true', default=False,
+      help='Use SSL to connect')
+
+
 def create_cluster(args):
   if args.cm_host:
     cluster = CmCluster(
@@ -167,6 +174,8 @@ def create_cluster(args):
   else:
     cluster = MiniCluster(args.hive_host, args.hive_port, 
args.minicluster_num_impalads)
   cluster.hadoop_user_name = args.hadoop_user_name
+  cluster.use_kerberos = getattr(args, 'use_kerberos', False)
+  cluster.use_ssl = getattr(args, 'use_ssl', False)
   return cluster
 
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/tests/comparison/cluster.py
----------------------------------------------------------------------
diff --git a/tests/comparison/cluster.py b/tests/comparison/cluster.py
index ab1e4f3..87a4009 100644
--- a/tests/comparison/cluster.py
+++ b/tests/comparison/cluster.py
@@ -44,6 +44,7 @@ from urlparse import urlparse
 from xml.etree.ElementTree import parse as parse_xml
 from zipfile import ZipFile
 
+
 from db_connection import HiveConnection, ImpalaConnection
 from tests.common.errors import Timeout
 from tests.util.shell_util import shell as local_shell
@@ -71,7 +72,8 @@ class Cluster(object):
     self._hadoop_configs = None
     self._local_hadoop_conf_dir = None
     self.hadoop_user_name = getuser()
-    self.is_kerberized = False
+    self.use_kerberos = False
+    self.use_ssl = False
 
     self._hdfs = None
     self._yarn = None
@@ -360,9 +362,18 @@ class Hdfs(Service):
     """Returns an HdfsClient."""
     endpoint = self.cluster.get_hadoop_config("dfs.namenode.http-address",
                                               "0.0.0.0:50070")
-    if endpoint.startswith("0.0.0.0"):
-      endpoint.replace("0.0.0.0", "127.0.0.1")
-    return HdfsClient("http://%s"; % endpoint, use_kerberos=False,
+    ip, port = endpoint.split(':')
+    if ip == "0.0.0.0":
+      ip = "127.0.0.1"
+    if self.cluster.use_ssl:
+      port = self.cluster.get_hadoop_config("dfs.https.port", 20102)
+      scheme = 'https'
+    else:
+      scheme = 'http'
+    endpoint = ':'.join([ip, port])
+    return HdfsClient(
+        "{scheme}://{endpoint}".format(scheme=scheme, endpoint=endpoint),
+        use_kerberos=self.cluster.use_kerberos,
         user_name=(self._admin_user_name if as_admin else 
self.cluster.hadoop_user_name))
 
   def ensure_home_dir(self, user=None):
@@ -381,12 +392,16 @@ class Hdfs(Service):
 class HdfsClient(object):
 
   def __init__(self, url, user_name=None, use_kerberos=False):
+    # Set a specific session that doesn't verify SSL certs. This is needed 
because
+    # requests doesn't like self-signed certs.
+    # TODO: Support a CA bundle.
+    s = requests.Session()
+    s.verify = False
     if use_kerberos:
-      # TODO: Have the virtualenv attempt to install a list of optional libs.
       try:
-        import kerberos
+        from hdfs.ext.kerberos import KerberosClient
       except ImportError as e:
-        if "No module named kerberos" not in str(e):
+        if "No module named requests_kerberos" not in str(e):
           raise e
         import os
         import subprocess
@@ -394,16 +409,18 @@ class HdfsClient(object):
         pip_path = os.path.join(os.environ["IMPALA_HOME"], "infra", "python", 
"env",
             "bin", "pip")
         try:
-          local_shell(pip_path + " install kerboros", stdout=subprocess.PIPE,
-              stderr=subprocess.STDOUT)
+          local_shell(pip_path + " install pykerberos==1.1.14 
requests-kerberos==0.11.0",
+                      stdout=subprocess.PIPE,
+                      stderr=subprocess.STDOUT)
           LOG.info("kerberos installation complete.")
         except Exception as e:
           LOG.error("kerberos installation failed. Try installing libkrb5-dev 
and"
               " then try again.")
           raise e
-      self._client = hdfs.ext.kerberos.KerberosClient(url, user=user_name)
+      from hdfs.ext.kerberos import KerberosClient
+      self._client = KerberosClient(url, session=s)
     else:
-      self._client = hdfs.client.InsecureClient(url, user=user_name)
+      self._client = hdfs.client.InsecureClient(url, user=user_name, session=s)
 
   def __getattr__(self, name):
     return getattr(self._client, name)
@@ -463,7 +480,7 @@ class Hive(Service):
   def connect(self, db_name=None):
     conn = HiveConnection(host_name=self.hs2_host_name, port=self.hs2_port,
         user_name=self.cluster.hadoop_user_name, db_name=db_name,
-        use_kerberos=self.cluster.is_kerberized)
+        use_kerberos=self.cluster.use_kerberos, use_ssl=self.cluster.use_ssl)
     conn.cluster = self.cluster
     return conn
 
@@ -497,7 +514,7 @@ class Impala(Service):
       impalad = choice(self.impalads)
     conn = ImpalaConnection(host_name=impalad.host_name, port=impalad.hs2_port,
         user_name=self.cluster.hadoop_user_name, db_name=db_name,
-        use_kerberos=self.cluster.is_kerberized)
+        use_kerberos=self.cluster.use_kerberos, use_ssl=self.cluster.use_ssl)
     conn.cluster = self.cluster
     return conn
 
@@ -734,9 +751,19 @@ class Impalad(object):
     return data
 
   def _request_web_page(self, relative_url, params={}, 
timeout_secs=DEFAULT_TIMEOUT):
-    url = "http://%s:%s%s"; % (self.host_name, self.web_ui_port, relative_url)
+    if self.cluster.use_ssl:
+      scheme = 'https'
+    else:
+      scheme = 'http'
+    url = '{scheme}://{host}:{port}{url}'.format(
+        scheme=scheme,
+        host=self.host_name,
+        port=self.web_ui_port,
+        url=relative_url)
     try:
-      resp = requests.get(url, params=params, timeout=timeout_secs)
+      # verify=False is needed because of self-signed certifiates
+      # TODO: support a CA bundle that users could point to instead
+      resp = requests.get(url, params=params, timeout=timeout_secs, 
verify=False)
     except requests.exceptions.Timeout as e:
       raise Timeout(underlying_exception=e)
     resp.raise_for_status()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/tests/comparison/db_connection.py
----------------------------------------------------------------------
diff --git a/tests/comparison/db_connection.py 
b/tests/comparison/db_connection.py
index 7117bff..8722755 100644
--- a/tests/comparison/db_connection.py
+++ b/tests/comparison/db_connection.py
@@ -851,11 +851,13 @@ class ImpalaConnection(DbConnection):
 
   _DB_TYPE = IMPALA
   _CURSOR_CLASS = ImpalaCursor
+  _KERBEROS_SERVICE_NAME = 'impala'
   _NON_KERBEROS_AUTH_MECH = 'NOSASL'
 
-  def __init__(self, use_kerberos=False, **kwargs):
+  def __init__(self, use_kerberos=False, use_ssl=False, **kwargs):
     self._use_kerberos = use_kerberos
     self.cluster = None
+    self._use_ssl = use_ssl
     DbConnection.__init__(self, **kwargs)
 
   def clone(self, db_name):
@@ -884,7 +886,9 @@ class ImpalaConnection(DbConnection):
         password=self._password,
         database=self.db_name,
         timeout=(60 * 60),
-        auth_mechanism=('GSSAPI' if self._use_kerberos else 
self._NON_KERBEROS_AUTH_MECH))
+        auth_mechanism=('GSSAPI' if self._use_kerberos else 
self._NON_KERBEROS_AUTH_MECH),
+        kerberos_service_name=self._KERBEROS_SERVICE_NAME,
+        use_ssl=self._use_ssl)
 
 
 class HiveCursor(ImpalaCursor):
@@ -909,6 +913,7 @@ class HiveConnection(ImpalaConnection):
 
   _DB_TYPE = HIVE
   _CURSOR_CLASS = HiveCursor
+  _KERBEROS_SERVICE_NAME = 'hive'
   _NON_KERBEROS_AUTH_MECH = 'PLAIN'
 
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py 
b/tests/stress/concurrent_select.py
index 2283608..7aa44b7 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -1650,6 +1650,7 @@ def main():
   cli_options.add_logging_options(parser)
   cli_options.add_cluster_options(parser)
   cli_options.add_kerberos_options(parser)
+  cli_options.add_ssl_options(parser)
   parser.add_argument(
       "--runtime-info-path",
       default=os.path.join(gettempdir(), "{cm_host}_query_runtime_info.json"),
@@ -1825,7 +1826,6 @@ def main():
             query_option=query_option, value=value))
 
   cluster = cli_options.create_cluster(args)
-  cluster.is_kerberized = args.use_kerberos
   impala = cluster.impala
   if impala.find_stopped_impalads():
     impala.restart()

Reply via email to