IMPALA-5162,IMPALA-5163: stress test support on secure clusters
This patch adds support for running the stress test
(concurrent_select.py) and loading nested data (load_nested.py) into a
Kerberized, SSL-enabled Impala cluster. It assumes the calling user
already has a valid Kerberos ticket. One way to do that is:
1. Get access to a keytab and krb5.config
2. Set KRB5_CONFIG and KRB5CCNAME appropriately
3. Run kinit(1)
4. Run load_nested.py and/or concurrent_select.py within this
environment.
Because our Python clients already support Kerberos and SSL, we simply
need to make sure to use the correct options when calling the entry
points and initializing the clients:
Impala: Impyla
Hive: Impyla
HDFS: hdfs.ext.kerberos.KerberosClient
With this patch, I was able to manually do a short concurrent_select.py
run against a secure cluster without connection or auth errors, and I
was able to do the same with load_nested.py for a cluster that already
had TPC-H loaded.
Follow-ons for future cleanup work:
IMPALA-5263: support CA bundles when running stress test against SSL'd
Impala
IMPALA-5264: fix InsecurePlatformWarning under stress test with SSL
Change-Id: I0daad57bb8ceeb5071b75125f11c1997ed7e0179
Reviewed-on: http://gerrit.cloudera.org:8080/6763
Reviewed-by: Matthew Mulder <[email protected]>
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/8b459dff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/8b459dff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/8b459dff
Branch: refs/heads/master
Commit: 8b459dffec9e093e87da9ab6e8b2e5a9de50a7bd
Parents: 801c95f
Author: Michael Brown <[email protected]>
Authored: Fri Mar 31 10:39:54 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue May 2 04:56:01 2017 +0000
----------------------------------------------------------------------
testdata/bin/load_nested.py | 2 ++
tests/comparison/cli_options.py | 9 ++++++
tests/comparison/cluster.py | 57 +++++++++++++++++++++++++---------
tests/comparison/db_connection.py | 9 ++++--
tests/stress/concurrent_select.py | 2 +-
5 files changed, 61 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/testdata/bin/load_nested.py
----------------------------------------------------------------------
diff --git a/testdata/bin/load_nested.py b/testdata/bin/load_nested.py
index b44bd04..146c0ff 100755
--- a/testdata/bin/load_nested.py
+++ b/testdata/bin/load_nested.py
@@ -298,6 +298,8 @@ if __name__ == "__main__":
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
cli_options.add_logging_options(parser)
cli_options.add_cluster_options(parser) # --cm-host and similar args added
here
+ cli_options.add_kerberos_options(parser)
+ cli_options.add_ssl_options(parser)
parser.add_argument("-s", "--source-db", default="tpch_parquet")
parser.add_argument("-t", "--target-db", default="tpch_nested_parquet")
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/tests/comparison/cli_options.py
----------------------------------------------------------------------
diff --git a/tests/comparison/cli_options.py b/tests/comparison/cli_options.py
index 714d899..ca2efc6 100644
--- a/tests/comparison/cli_options.py
+++ b/tests/comparison/cli_options.py
@@ -156,6 +156,13 @@ def add_cm_options(parser):
help='If CM manages multiple clusters, use this to specify which cluster
to use.')
+def add_ssl_options(parser):
+ group = parser.add_argument_group('SSL Options')
+ group.add_argument(
+ '--use-ssl', action='store_true', default=False,
+ help='Use SSL to connect')
+
+
def create_cluster(args):
if args.cm_host:
cluster = CmCluster(
@@ -167,6 +174,8 @@ def create_cluster(args):
else:
cluster = MiniCluster(args.hive_host, args.hive_port,
args.minicluster_num_impalads)
cluster.hadoop_user_name = args.hadoop_user_name
+ cluster.use_kerberos = getattr(args, 'use_kerberos', False)
+ cluster.use_ssl = getattr(args, 'use_ssl', False)
return cluster
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/tests/comparison/cluster.py
----------------------------------------------------------------------
diff --git a/tests/comparison/cluster.py b/tests/comparison/cluster.py
index ab1e4f3..87a4009 100644
--- a/tests/comparison/cluster.py
+++ b/tests/comparison/cluster.py
@@ -44,6 +44,7 @@ from urlparse import urlparse
from xml.etree.ElementTree import parse as parse_xml
from zipfile import ZipFile
+
from db_connection import HiveConnection, ImpalaConnection
from tests.common.errors import Timeout
from tests.util.shell_util import shell as local_shell
@@ -71,7 +72,8 @@ class Cluster(object):
self._hadoop_configs = None
self._local_hadoop_conf_dir = None
self.hadoop_user_name = getuser()
- self.is_kerberized = False
+ self.use_kerberos = False
+ self.use_ssl = False
self._hdfs = None
self._yarn = None
@@ -360,9 +362,18 @@ class Hdfs(Service):
"""Returns an HdfsClient."""
endpoint = self.cluster.get_hadoop_config("dfs.namenode.http-address",
"0.0.0.0:50070")
- if endpoint.startswith("0.0.0.0"):
- endpoint.replace("0.0.0.0", "127.0.0.1")
- return HdfsClient("http://%s" % endpoint, use_kerberos=False,
+ ip, port = endpoint.split(':')
+ if ip == "0.0.0.0":
+ ip = "127.0.0.1"
+ if self.cluster.use_ssl:
+ port = self.cluster.get_hadoop_config("dfs.https.port", 20102)
+ scheme = 'https'
+ else:
+ scheme = 'http'
+ endpoint = ':'.join([ip, port])
+ return HdfsClient(
+ "{scheme}://{endpoint}".format(scheme=scheme, endpoint=endpoint),
+ use_kerberos=self.cluster.use_kerberos,
user_name=(self._admin_user_name if as_admin else
self.cluster.hadoop_user_name))
def ensure_home_dir(self, user=None):
@@ -381,12 +392,16 @@ class Hdfs(Service):
class HdfsClient(object):
def __init__(self, url, user_name=None, use_kerberos=False):
+ # Set a specific session that doesn't verify SSL certs. This is needed
because
+ # requests doesn't like self-signed certs.
+ # TODO: Support a CA bundle.
+ s = requests.Session()
+ s.verify = False
if use_kerberos:
- # TODO: Have the virtualenv attempt to install a list of optional libs.
try:
- import kerberos
+ from hdfs.ext.kerberos import KerberosClient
except ImportError as e:
- if "No module named kerberos" not in str(e):
+ if "No module named requests_kerberos" not in str(e):
raise e
import os
import subprocess
@@ -394,16 +409,18 @@ class HdfsClient(object):
pip_path = os.path.join(os.environ["IMPALA_HOME"], "infra", "python",
"env",
"bin", "pip")
try:
- local_shell(pip_path + " install kerboros", stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
+ local_shell(pip_path + " install pykerberos==1.1.14
requests-kerberos==0.11.0",
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
LOG.info("kerberos installation complete.")
except Exception as e:
LOG.error("kerberos installation failed. Try installing libkrb5-dev
and"
" then try again.")
raise e
- self._client = hdfs.ext.kerberos.KerberosClient(url, user=user_name)
+ from hdfs.ext.kerberos import KerberosClient
+ self._client = KerberosClient(url, session=s)
else:
- self._client = hdfs.client.InsecureClient(url, user=user_name)
+ self._client = hdfs.client.InsecureClient(url, user=user_name, session=s)
def __getattr__(self, name):
return getattr(self._client, name)
@@ -463,7 +480,7 @@ class Hive(Service):
def connect(self, db_name=None):
conn = HiveConnection(host_name=self.hs2_host_name, port=self.hs2_port,
user_name=self.cluster.hadoop_user_name, db_name=db_name,
- use_kerberos=self.cluster.is_kerberized)
+ use_kerberos=self.cluster.use_kerberos, use_ssl=self.cluster.use_ssl)
conn.cluster = self.cluster
return conn
@@ -497,7 +514,7 @@ class Impala(Service):
impalad = choice(self.impalads)
conn = ImpalaConnection(host_name=impalad.host_name, port=impalad.hs2_port,
user_name=self.cluster.hadoop_user_name, db_name=db_name,
- use_kerberos=self.cluster.is_kerberized)
+ use_kerberos=self.cluster.use_kerberos, use_ssl=self.cluster.use_ssl)
conn.cluster = self.cluster
return conn
@@ -734,9 +751,19 @@ class Impalad(object):
return data
def _request_web_page(self, relative_url, params={},
timeout_secs=DEFAULT_TIMEOUT):
- url = "http://%s:%s%s" % (self.host_name, self.web_ui_port, relative_url)
+ if self.cluster.use_ssl:
+ scheme = 'https'
+ else:
+ scheme = 'http'
+ url = '{scheme}://{host}:{port}{url}'.format(
+ scheme=scheme,
+ host=self.host_name,
+ port=self.web_ui_port,
+ url=relative_url)
try:
- resp = requests.get(url, params=params, timeout=timeout_secs)
+ # verify=False is needed because of self-signed certifiates
+ # TODO: support a CA bundle that users could point to instead
+ resp = requests.get(url, params=params, timeout=timeout_secs,
verify=False)
except requests.exceptions.Timeout as e:
raise Timeout(underlying_exception=e)
resp.raise_for_status()
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/tests/comparison/db_connection.py
----------------------------------------------------------------------
diff --git a/tests/comparison/db_connection.py
b/tests/comparison/db_connection.py
index 7117bff..8722755 100644
--- a/tests/comparison/db_connection.py
+++ b/tests/comparison/db_connection.py
@@ -851,11 +851,13 @@ class ImpalaConnection(DbConnection):
_DB_TYPE = IMPALA
_CURSOR_CLASS = ImpalaCursor
+ _KERBEROS_SERVICE_NAME = 'impala'
_NON_KERBEROS_AUTH_MECH = 'NOSASL'
- def __init__(self, use_kerberos=False, **kwargs):
+ def __init__(self, use_kerberos=False, use_ssl=False, **kwargs):
self._use_kerberos = use_kerberos
self.cluster = None
+ self._use_ssl = use_ssl
DbConnection.__init__(self, **kwargs)
def clone(self, db_name):
@@ -884,7 +886,9 @@ class ImpalaConnection(DbConnection):
password=self._password,
database=self.db_name,
timeout=(60 * 60),
- auth_mechanism=('GSSAPI' if self._use_kerberos else
self._NON_KERBEROS_AUTH_MECH))
+ auth_mechanism=('GSSAPI' if self._use_kerberos else
self._NON_KERBEROS_AUTH_MECH),
+ kerberos_service_name=self._KERBEROS_SERVICE_NAME,
+ use_ssl=self._use_ssl)
class HiveCursor(ImpalaCursor):
@@ -909,6 +913,7 @@ class HiveConnection(ImpalaConnection):
_DB_TYPE = HIVE
_CURSOR_CLASS = HiveCursor
+ _KERBEROS_SERVICE_NAME = 'hive'
_NON_KERBEROS_AUTH_MECH = 'PLAIN'
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8b459dff/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py
b/tests/stress/concurrent_select.py
index 2283608..7aa44b7 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -1650,6 +1650,7 @@ def main():
cli_options.add_logging_options(parser)
cli_options.add_cluster_options(parser)
cli_options.add_kerberos_options(parser)
+ cli_options.add_ssl_options(parser)
parser.add_argument(
"--runtime-info-path",
default=os.path.join(gettempdir(), "{cm_host}_query_runtime_info.json"),
@@ -1825,7 +1826,6 @@ def main():
query_option=query_option, value=value))
cluster = cli_options.create_cluster(args)
- cluster.is_kerberized = args.use_kerberos
impala = cluster.impala
if impala.find_stopped_impalads():
impala.restart()