This is an automated email from the ASF dual-hosted git repository. qianzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push: new c66aaf4 Added Mesos authentication to the Mesos cli c66aaf4 is described below commit c66aaf47768aa0e5b66ee9d830f634f4dd682c43 Author: Andreas Peters <a...@aventer.biz> AuthorDate: Wed Jul 21 21:05:39 2021 +0800 Added Mesos authentication to the Mesos cli The following points I have done: - Add authentication against Mesos master and agent. - Add option to skip SSL verification of the mesos-agent. - Change the order of "task list" to get back more running states. This closes #383 --- src/python/cli_new/README.md | 10 +++ src/python/cli_new/lib/cli/config.py | 86 ++++++++++++++++++++++++ src/python/cli_new/lib/cli/http.py | 72 ++++++++++---------- src/python/cli_new/lib/cli/mesos.py | 39 +++++++---- src/python/cli_new/lib/cli/plugins/agent/main.py | 3 +- src/python/cli_new/lib/cli/plugins/task/main.py | 9 ++- src/python/cli_new/lib/cli/tests/agent.py | 2 +- src/python/cli_new/lib/cli/tests/base.py | 4 +- src/python/cli_new/lib/cli/tests/task.py | 10 +-- src/python/cli_new/pip-requirements.txt | 2 - src/python/cli_new/tox.ini | 3 +- 11 files changed, 176 insertions(+), 64 deletions(-) diff --git a/src/python/cli_new/README.md b/src/python/cli_new/README.md index 0e6c716..7ac22b2 100644 --- a/src/python/cli_new/README.md +++ b/src/python/cli_new/README.md @@ -104,6 +104,9 @@ plugins = [ # `address` or `zookeeper` field, but not both. For example: [master] address = "10.10.0.30:5050" + principal = "username" + secret = "password" + # The `zookeeper` field has an `addresses` array and a `path` field. # [master.zookeeper] # addresses = [ @@ -112,6 +115,13 @@ plugins = [ # "10.10.0.33:5050" # ] # path = "/mesos" + +[agent] + ssl = true + ssl_verify = false + principal = "username" + secret = "password" + timeout = 5 ``` You can override the location of this configuration file using diff --git a/src/python/cli_new/lib/cli/config.py b/src/python/cli_new/lib/cli/config.py index 7f41736..fa6c8ff 100644 --- a/src/python/cli_new/lib/cli/config.py +++ b/src/python/cli_new/lib/cli/config.py @@ -21,6 +21,7 @@ Config class to manage the configuration file. import os import toml +import requests import cli from cli.constants import DEFAULT_MASTER_IP @@ -119,6 +120,79 @@ class Config(): return master + def principal(self): + """ + Return the principal in the configuration file + """ + return self.data["master"].get("principal") + + def secret(self): + """ + Return the secret in the configuration file + """ + return self.data["master"].get("secret") + + def agent_ssl(self, default=False): + """ + Return if the agent support ssl + """ + if "agent" in self.data: + agent_ssl = self.data["agent"].get("ssl", default) + if not isinstance(agent_ssl, bool): + raise CLIException("The 'agent->ssl' field" + " must be True/False") + + return agent_ssl + + return default + + def agent_ssl_verify(self, default=False): + """ + Return if the ssl certificate should be verified + """ + if "agent" in self.data: + ssl_verify = self.data["agent"].get("ssl_verify", default) + if not isinstance(ssl_verify, bool): + raise CLIException("The 'agent->ssl_verify' field" + " must be True/False") + + return ssl_verify + + return default + + def agent_timeout(self, default=5): + """ + Return the connection timeout of the agent + """ + if "agent" in self.data: + timeout = self.data["agent"].get("timeout", default) + if not isinstance(timeout, int): + raise CLIException("The 'agent->timeout' field" + " must be a number in seconds") + + return timeout + + return default + + + def agent_principal(self): + """ + Return the principal in the configuration file + """ + if "agent" in self.data: + return self.data["agent"].get("principal") + + return None + + def agent_secret(self): + """ + Return the secret in the configuration file + """ + if "agent" in self.data: + return self.data["agent"].get("secret") + + return None + def plugins(self): """ Parse the plugins listed in the configuration file and return them. @@ -137,3 +211,15 @@ class Config(): return self.data["plugins"] return [] + + def authentication_header(self): + """ + Return the BasicAuth authentication header + """ + if (self.agent_principal() is not None + and self.agent_secret() is not None): + return requests.auth.HTTPBasicAuth( + self.agent_principal(), + self.agent_secret() + ) + return None diff --git a/src/python/cli_new/lib/cli/http.py b/src/python/cli_new/lib/cli/http.py index 10fd889..c39935f 100644 --- a/src/python/cli_new/lib/cli/http.py +++ b/src/python/cli_new/lib/cli/http.py @@ -19,70 +19,70 @@ A collection of http related functions used by the CLI and its Plugins. """ import json -import urllib.request -import urllib.error -import urllib.parse -import time +from urllib.parse import urlencode +import urllib3 import cli from cli.exceptions import CLIException +# Disable all SSL warnings. These are not necessary, as the user has +# the option to disable SSL verification. +urllib3.disable_warnings() -def read_endpoint(addr, endpoint, query=None): +def read_endpoint(addr, endpoint, config, query=None): """ Read the specified endpoint and return the results. """ + try: addr = cli.util.sanitize_address(addr) except Exception as exception: raise CLIException("Unable to sanitize address '{addr}': {error}" .format(addr=addr, error=str(exception))) - try: url = "{addr}/{endpoint}".format(addr=addr, endpoint=endpoint) if query is not None: - url += "?{query}".format(query=urllib.parse.urlencode(query)) - http_response = urllib.request.urlopen(url).read().decode("utf-8") + url += "?{query}".format(query=urlencode(query)) + if config.principal() is not None and config.secret() is not None: + headers = urllib3.make_headers( + basic_auth=config.principal() + ":" + config.secret() + ) + else: + headers = None + http = urllib3.PoolManager() + http_response = http.request( + 'GET', + url, + headers=headers, + timeout=config.agent_timeout() + ) + return http_response.data.decode('utf-8') + except Exception as exception: raise CLIException("Unable to open url '{url}': {error}" .format(url=url, error=str(exception))) - return http_response - -def get_json(addr, endpoint, condition=None, timeout=5, query=None): +def get_json(addr, endpoint, config, condition=None, query=None): """ Return the contents of the 'endpoint' at 'addr' as JSON data subject to the condition specified in 'condition'. If we are - unable to read the data or unable to meet the condition within - 'timeout' seconds we throw an error. + unable to read the data we throw an error. """ - start_time = time.time() - while True: - data = None + data = read_endpoint(addr, endpoint, config, query) - try: - data = read_endpoint(addr, endpoint, query) - except Exception as exception: - pass - - if data: - try: - data = json.loads(data) - except Exception as exception: - raise CLIException("Could not load JSON from '{data}': {error}" - .format(data=data, error=str(exception))) - - if not condition: - return data + try: + data = json.loads(data) + except Exception as exception: + raise CLIException("Could not load JSON from '{data}': {error}" + .format(data=data, error=str(exception))) - if condition(data): - return data + if not condition: + return data - if time.time() - start_time > timeout: - raise CLIException("Failed to get data within {seconds} seconds" - .format(seconds=str(timeout))) + if condition(data): + return data - time.sleep(0.1) + return data diff --git a/src/python/cli_new/lib/cli/mesos.py b/src/python/cli_new/lib/cli/mesos.py index a6fd95f..44a66db 100644 --- a/src/python/cli_new/lib/cli/mesos.py +++ b/src/python/cli_new/lib/cli/mesos.py @@ -44,13 +44,13 @@ from mesos.exceptions import MesosException from mesos.exceptions import MesosHTTPException -def get_agent_address(agent_id, master): +def get_agent_address(agent_id, master, config): """ Given a master and an agent id, return the agent address by checking the /slaves endpoint of the master. """ try: - agents = http.get_json(master, "slaves")["slaves"] + agents = http.get_json(master, "slaves", config)["slaves"] except Exception as exception: raise CLIException("Could not open '/slaves'" " endpoint at '{addr}': {error}" @@ -62,15 +62,14 @@ def get_agent_address(agent_id, master): raise CLIException("Unable to find agent '{id}'".format(id=agent_id)) -def get_agents(master): +def get_agents(master, config): """ Get the agents in a Mesos cluster. """ endpoint = "slaves" key = "slaves" - try: - data = http.get_json(master, endpoint) + data = http.get_json(master, endpoint, config) except Exception as exception: raise CLIException( "Could not open '/{endpoint}' on master: {error}" @@ -114,15 +113,18 @@ def get_container_id(task): " Please try again.") -def get_tasks(master, query=None): +def get_tasks(master, config, query=None): """ Get the tasks in a Mesos cluster. """ endpoint = "tasks" key = "tasks" + if query is None: + query = {'order':'asc'} + try: - data = http.get_json(master, endpoint, query=query) + data = http.get_json(master, endpoint, config, query=query) except Exception as exception: raise CLIException( "Could not open '/{endpoint}' with query parameters: {query}" @@ -162,7 +164,7 @@ class TaskIO(): HEARTBEAT_INTERVAL = 30 HEARTBEAT_INTERVAL_NANOSECONDS = HEARTBEAT_INTERVAL * 1000000000 - def __init__(self, master, task_id): + def __init__(self, master, config, task_id): # Get the task and make sure its container was launched by the UCR. # Since task's containers are launched by the UCR by default, we want # to allow most tasks to pass through unchecked. The only exception is @@ -170,7 +172,7 @@ class TaskIO(): # "MESOS". Having a type of "MESOS" implies that it was launched by the # UCR -- all other types imply it was not. try: - tasks = get_tasks(master, query={'task_id': task_id}) + tasks = get_tasks(master, config, query={'task_id': task_id}) except Exception as exception: raise CLIException("Unable to get task with ID {task_id}" " from leading master '{master}': {error}" @@ -199,11 +201,13 @@ class TaskIO(): "This command is only supported for tasks" " launched by the Universal Container Runtime (UCR).") + # Get the scheme of the agent + scheme = "https://" if config.agent_ssl() else "http://" + # Get the URL to the agent running the task. agent_addr = util.sanitize_address( - get_agent_address(task_obj["slave_id"], master)) + scheme + get_agent_address(task_obj["slave_id"], master, config)) self.agent_url = mesos.http.simple_urljoin(agent_addr, "api/v1") - # Get the agent's task path by checking the `state` endpoint. try: self.container_id = get_container_id(task_obj) @@ -253,6 +257,7 @@ class TaskIO(): self.interactive = False self.tty = False self.output_thread_entry_point = None + self.config = config # Allow an exit sequence to be used to break the CLIs attachment to # the remote task. Depending on the call, this may be disabled, or @@ -323,7 +328,6 @@ class TaskIO(): self.args = _args self.interactive = _interactive self.tty = _tty - # Override the container ID with the current container ID as the # parent, and generate a new UUID for the nested container used to # run commands passed to `task exec`. @@ -413,6 +417,7 @@ class TaskIO(): 'wait_container': { 'container_id': self.container_id}} req_extra_args = { + 'verify': self.config.agent_ssl_verify(), 'additional_headers': { 'Content-Type': 'application/json', 'Accept': 'application/json'}} @@ -421,6 +426,7 @@ class TaskIO(): response = resource.request( mesos.http.METHOD_POST, data=json.dumps(message), + auth=self.config.authentication_header(), retry=False, timeout=None, **req_extra_args) @@ -511,6 +517,7 @@ class TaskIO(): req_extra_args = { 'stream': True, + 'verify': self.config.agent_ssl_verify(), 'additional_headers': { 'Content-Type': 'application/json', 'Accept': 'application/recordio', @@ -523,6 +530,7 @@ class TaskIO(): data=json.dumps(message), retry=False, timeout=None, + auth=self.config.authentication_header(), **req_extra_args) except MesosHTTPException as e: text = "I/O switchboard server was disabled for this container" @@ -539,6 +547,7 @@ class TaskIO(): nested container and attach to its output stream. The output stream is then sent back in the response. """ + message = { 'type': "LAUNCH_NESTED_CONTAINER_SESSION", 'launch_nested_container_session': { @@ -557,11 +566,11 @@ class TaskIO(): req_extra_args = { 'stream': True, + 'verify': self.config.agent_ssl_verify(), 'additional_headers': { 'Content-Type': 'application/json', 'Accept': 'application/recordio', 'Message-Accept': 'application/json'}} - resource = mesos.http.Resource(self.agent_url) try: response = resource.request( @@ -569,6 +578,7 @@ class TaskIO(): data=json.dumps(message), retry=False, timeout=None, + auth=self.config.authentication_header(), **req_extra_args) except MesosException as exception: raise CLIException("{error}".format(error=exception)) @@ -654,6 +664,7 @@ class TaskIO(): yield record req_extra_args = { + 'verify': self.config.agent_ssl_verify(), 'additional_headers': { 'Content-Type': 'application/recordio', 'Message-Content-Type': 'application/json', @@ -682,6 +693,7 @@ class TaskIO(): mesos.http.METHOD_POST, data=_initial_input_streamer(), retry=False, + auth=self.config.authentication_header(), **req_extra_args) except MesosHTTPException as e: if not e.response.status_code == 500: @@ -698,6 +710,7 @@ class TaskIO(): data=_input_streamer(), retry=False, timeout=None, + auth=self.config.authentication_header(), **req_extra_args) def _detect_exit_sequence(self, chunk): diff --git a/src/python/cli_new/lib/cli/plugins/agent/main.py b/src/python/cli_new/lib/cli/plugins/agent/main.py index 4a658f9..158e83a 100644 --- a/src/python/cli_new/lib/cli/plugins/agent/main.py +++ b/src/python/cli_new/lib/cli/plugins/agent/main.py @@ -53,12 +53,13 @@ class Agent(PluginBase): # pylint: disable=unused-argument try: master = self.config.master() + config = self.config except Exception as exception: raise CLIException("Unable to get leading master address: {error}" .format(error=exception)) try: - agents = get_agents(master) + agents = get_agents(master, config) except Exception as exception: raise CLIException("Unable to get agents from leading" " master '{master}': {error}" diff --git a/src/python/cli_new/lib/cli/plugins/task/main.py b/src/python/cli_new/lib/cli/plugins/task/main.py index 00167f8..d223746 100644 --- a/src/python/cli_new/lib/cli/plugins/task/main.py +++ b/src/python/cli_new/lib/cli/plugins/task/main.py @@ -75,11 +75,12 @@ class Task(PluginBase): """ try: master = self.config.master() + config = self.config except Exception as exception: raise CLIException("Unable to get leading master address: {error}" .format(error=exception)) - task_io = TaskIO(master, argv["<task-id>"]) + task_io = TaskIO(master, config, argv["<task-id>"]) return task_io.attach(argv["--no-stdin"]) @@ -89,11 +90,12 @@ class Task(PluginBase): """ try: master = self.config.master() + config = self.config except Exception as exception: raise CLIException("Unable to get leading master address: {error}" .format(error=exception)) - task_io = TaskIO(master, argv["<task-id>"]) + task_io = TaskIO(master, config, argv["<task-id>"]) return task_io.exec(argv["<command>"], argv["<args>"], argv["--interactive"], @@ -106,12 +108,13 @@ class Task(PluginBase): # pylint: disable=unused-argument try: master = self.config.master() + config = self.config except Exception as exception: raise CLIException("Unable to get leading master address: {error}" .format(error=exception)) try: - tasks = get_tasks(master) + tasks = get_tasks(master, config) except Exception as exception: raise CLIException("Unable to get tasks from leading" " master '{master}': {error}" diff --git a/src/python/cli_new/lib/cli/tests/agent.py b/src/python/cli_new/lib/cli/tests/agent.py index 31e3e3f..8ff6842 100644 --- a/src/python/cli_new/lib/cli/tests/agent.py +++ b/src/python/cli_new/lib/cli/tests/agent.py @@ -48,7 +48,7 @@ class TestAgentPlugin(CLITestCase): # Open the master's `/slaves` endpoint and read the # agents' information ourselves. - agents = http.get_json(master.addr, 'slaves')["slaves"] + agents = http.get_json(master.addr, None, 'slaves')["slaves"] self.assertEqual(type(agents), list) self.assertEqual(len(agents), 1) diff --git a/src/python/cli_new/lib/cli/tests/base.py b/src/python/cli_new/lib/cli/tests/base.py index e3104fe..980c00b 100644 --- a/src/python/cli_new/lib/cli/tests/base.py +++ b/src/python/cli_new/lib/cli/tests/base.py @@ -346,7 +346,7 @@ class Task(Executable): to return data subject to 'condition'. """ try: - data = http.get_json(self.flags["master"], "slaves") + data = http.get_json(self.flags["master"], None, "slaves") except Exception as exception: raise CLIException("Could not get '/slaves' endpoint" " as JSON: {error}" @@ -510,7 +510,7 @@ def wait_for_task(master, name, state, delay=1): """ @retry(wait=wait_fixed(0.2), stop=stop_after_delay(delay)) def _wait_for_task(): - tasks = http.get_json(master.addr, "tasks")["tasks"] + tasks = http.get_json(master.addr, None, "tasks")["tasks"] for task in tasks: if task["name"] == name and task["state"] == state: return task diff --git a/src/python/cli_new/lib/cli/tests/task.py b/src/python/cli_new/lib/cli/tests/task.py index b846ee8..5511165 100644 --- a/src/python/cli_new/lib/cli/tests/task.py +++ b/src/python/cli_new/lib/cli/tests/task.py @@ -72,7 +72,7 @@ class TestTaskPlugin(CLITestCase): .format(name=task.name, state="TASK_RUNNING", error=exception)) try: - tasks = http.get_json(master.addr, "tasks")["tasks"] + tasks = http.get_json(master.addr, None, "tasks")["tasks"] except Exception as exception: raise CLIException( "Could not get tasks from '/{endpoint}' on master: {error}" @@ -116,7 +116,7 @@ class TestTaskPlugin(CLITestCase): .format(name=task.name, state="TASK_RUNNING", error=exception)) try: - tasks = http.get_json(master.addr, "tasks")["tasks"] + tasks = http.get_json(master.addr, None, "tasks")["tasks"] except Exception as exception: raise CLIException( "Could not get tasks from '/{endpoint}' on master: {error}" @@ -162,7 +162,7 @@ class TestTaskPlugin(CLITestCase): .format(name=task.name, state="TASK_RUNNING", error=exception)) try: - tasks = http.get_json(master.addr, "tasks")["tasks"] + tasks = http.get_json(master.addr, None, "tasks")["tasks"] except Exception as exception: raise CLIException( "Could not get tasks from '/{endpoint}' on master: {error}" @@ -210,7 +210,7 @@ class TestTaskPlugin(CLITestCase): .format(name=task.name, state="TASK_RUNNING", error=exception)) try: - tasks = http.get_json(master.addr, "tasks")["tasks"] + tasks = http.get_json(master.addr, None, "tasks")["tasks"] except Exception as exception: raise CLIException( "Could not get tasks from '/{endpoint}' on master: {error}" @@ -281,7 +281,7 @@ class TestTaskPlugin(CLITestCase): .format(name=task2.name, state=task2_state, error=exception)) try: - tasks = http.get_json(master.addr, "tasks")["tasks"] + tasks = http.get_json(master.addr, None, "tasks")["tasks"] except Exception as exception: raise CLIException( "Could not get tasks from '/{endpoint}' on master: {error}" diff --git a/src/python/cli_new/pip-requirements.txt b/src/python/cli_new/pip-requirements.txt index 4f512a4..f0642b9 100644 --- a/src/python/cli_new/pip-requirements.txt +++ b/src/python/cli_new/pip-requirements.txt @@ -1,11 +1,9 @@ -astroid==2.0.4 backports.functools-lru-cache==1.2.1 configparser==3.5.0 docopt==0.6.2 isort==4.2.5 kazoo==2.5.0 lazy-object-proxy==1.2.2 -mccabe==0.5.2 parse==1.8.0 Pygments==2.1.3 PyInstaller==3.4 diff --git a/src/python/cli_new/tox.ini b/src/python/cli_new/tox.ini index 3aa93a6..1d76fe4 100644 --- a/src/python/cli_new/tox.ini +++ b/src/python/cli_new/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = {py3,py36,py37}-{lint,test} +envlist = {py3,py36,py37,py38}-{lint,test} skipsdist = true [testenv] @@ -12,6 +12,7 @@ basepython = py3: python3 py36: python3.6 py37: python3.7 + py38: python3.8 deps = -rpip-requirements.txt test: coverage==4.5.1