http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/bin/cli/cli_factory.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli/cli_factory.py b/airflow/bin/cli/cli_factory.py deleted file mode 100644 index 167643b..0000000 --- a/airflow/bin/cli/cli_factory.py +++ /dev/null @@ -1,493 +0,0 @@ -from collections import namedtuple -from cli import * -from dateutil.parser import parse as parsedate -from airflow import settings -from airflow import conf - - -Arg = namedtuple( - 'Arg', ['flags', 'help', 'action', 'default', 'nargs', 'type', 'choices', 'metavar']) -Arg.__new__.__defaults__ = (None, None, None, None, None, None, None) - - -class CLIFactory(object): - args = { - # Shared - 'dag_id': Arg(("dag_id",), "The id of the dag"), - 'task_id': Arg(("task_id",), "The id of the task"), - 'execution_date': Arg( - ("execution_date",), help="The execution date of the DAG", - type=parsedate), - 'task_regex': Arg( - ("-t", "--task_regex"), - "The regex to filter specific task_ids to backfill (optional)"), - 'subdir': Arg( - ("-sd", "--subdir"), - "File location or directory from which to look for the dag", - default=settings.DAGS_FOLDER), - 'start_date': Arg( - ("-s", "--start_date"), "Override start_date YYYY-MM-DD", - type=parsedate), - 'end_date': Arg( - ("-e", "--end_date"), "Override end_date YYYY-MM-DD", - type=parsedate), - 'dry_run': Arg( - ("-dr", "--dry_run"), "Perform a dry run", "store_true"), - 'pid': Arg( - ("--pid", ), "PID file location", - nargs='?'), - 'daemon': Arg( - ("-D", "--daemon"), "Daemonize instead of running " - "in the foreground", - "store_true"), - 'stderr': Arg( - ("--stderr", ), "Redirect stderr to this file"), - 'stdout': Arg( - ("--stdout", ), "Redirect stdout to this file"), - 'log_file': Arg( - ("-l", "--log-file"), "Location of the log file"), - - # backfill - 'mark_success': Arg( - ("-m", "--mark_success"), - "Mark jobs as succeeded without running them", "store_true"), - 'local': Arg( - ("-l", "--local"), - "Run the task using the LocalExecutor", "store_true"), - 'donot_pickle': Arg( - ("-x", "--donot_pickle"), ( - "Do not attempt to pickle the DAG object to send over " - "to the workers, just tell the workers to run their version " - "of the code."), - "store_true"), - 'include_adhoc': Arg( - ("-a", "--include_adhoc"), - "Include dags with the adhoc parameter.", "store_true"), - 'bf_ignore_dependencies': Arg( - ("-i", "--ignore_dependencies"), - ( - "Skip upstream tasks, run only the tasks " - "matching the regexp. Only works in conjunction " - "with task_regex"), - "store_true"), - 'bf_ignore_first_depends_on_past': Arg( - ("-I", "--ignore_first_depends_on_past"), - ( - "Ignores depends_on_past dependencies for the first " - "set of tasks only (subsequent executions in the backfill " - "DO respect depends_on_past)."), - "store_true"), - 'pool': Arg(("--pool",), "Resource pool to use"), - # list_tasks - 'tree': Arg(("-t", "--tree"), "Tree view", "store_true"), - # list_dags - 'report': Arg( - ("-r", "--report"), "Show DagBag loading report", "store_true"), - # clear - 'upstream': Arg( - ("-u", "--upstream"), "Include upstream tasks", "store_true"), - 'only_failed': Arg( - ("-f", "--only_failed"), "Only failed jobs", "store_true"), - 'only_running': Arg( - ("-r", "--only_running"), "Only running jobs", "store_true"), - 'downstream': Arg( - ("-d", "--downstream"), "Include downstream tasks", "store_true"), - 'no_confirm': Arg( - ("-c", "--no_confirm"), - "Do not request confirmation", "store_true"), - 'exclude_subdags': Arg( - ("-x", "--exclude_subdags"), - "Exclude subdags", "store_true"), - # trigger_dag - 'run_id': Arg(("-r", "--run_id"), "Helps to identify this run"), - 'conf': Arg( - ('-c', '--conf'), - "JSON string that gets pickled into the DagRun's conf attribute"), - 'exec_date': Arg( - ("-e", "--exec_date"), help="The execution date of the DAG", - type=parsedate), - # pool - 'pool_set': Arg( - ("-s", "--set"), - nargs=3, - metavar=('NAME', 'SLOT_COUNT', 'POOL_DESCRIPTION'), - help="Set pool slot count and description, respectively"), - 'pool_get': Arg( - ("-g", "--get"), - metavar='NAME', - help="Get pool info"), - 'pool_delete': Arg( - ("-x", "--delete"), - metavar="NAME", - help="Delete a pool"), - # variables - 'set': Arg( - ("-s", "--set"), - nargs=2, - metavar=('KEY', 'VAL'), - help="Set a variable"), - 'get': Arg( - ("-g", "--get"), - metavar='KEY', - help="Get value of a variable"), - 'default': Arg( - ("-d", "--default"), - metavar="VAL", - default=None, - help="Default value returned if variable does not exist"), - 'json': Arg( - ("-j", "--json"), - help="Deserialize JSON variable", - action="store_true"), - 'var_import': Arg( - ("-i", "--import"), - metavar="FILEPATH", - help="Import variables from JSON file"), - 'var_export': Arg( - ("-e", "--export"), - metavar="FILEPATH", - help="Export variables to JSON file"), - 'var_delete': Arg( - ("-x", "--delete"), - metavar="KEY", - help="Delete a variable"), - # kerberos - 'principal': Arg( - ("principal",), "kerberos principal", - nargs='?', default=conf.get('kerberos', 'principal')), - 'keytab': Arg( - ("-kt", "--keytab"), "keytab", - nargs='?', default=conf.get('kerberos', 'keytab')), - # run - # TODO(aoen): "force" is a poor choice of name here since it implies it overrides - # all dependencies (not just past success), e.g. the ignore_depends_on_past - # dependency. This flag should be deprecated and renamed to 'ignore_ti_state' and - # the "ignore_all_dependencies" command should be called the"force" command - # instead. - 'force': Arg( - ("-f", "--force"), - "Ignore previous task instance state, rerun regardless if task already " - "succeeded/failed", - "store_true"), - 'raw': Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true"), - 'ignore_all_dependencies': Arg( - ("-A", "--ignore_all_dependencies"), - "Ignores all non-critical dependencies, including ignore_ti_state and " - "ignore_task_deps", - "store_true"), - # TODO(aoen): ignore_dependencies is a poor choice of name here because it is too - # vague (e.g. a task being in the appropriate state to be run is also a dependency - # but is not ignored by this flag), the name 'ignore_task_dependencies' is - # slightly better (as it ignores all dependencies that are specific to the task), - # so deprecate the old command name and use this instead. - 'ignore_dependencies': Arg( - ("-i", "--ignore_dependencies"), - "Ignore task-specific dependencies, e.g. upstream, depends_on_past, and " - "retry delay dependencies", - "store_true"), - 'ignore_depends_on_past': Arg( - ("-I", "--ignore_depends_on_past"), - "Ignore depends_on_past dependencies (but respect " - "upstream dependencies)", - "store_true"), - 'ship_dag': Arg( - ("--ship_dag",), - "Pickles (serializes) the DAG and ships it to the worker", - "store_true"), - 'kubernetes_mode': Arg( - ("-km", "--kubernetes_mode"), - "Pod will not contact postgres instance. should only be used for kubernetes tasks", - "store_true"), - 'pickle': Arg( - ("-p", "--pickle"), - "Serialized pickle object of the entire dag (used internally)"), - 'job_id': Arg(("-j", "--job_id"), argparse.SUPPRESS), - 'cfg_path': Arg( - ("--cfg_path", ), "Path to config file to use instead of airflow.cfg"), - # webserver - 'port': Arg( - ("-p", "--port"), - default=conf.get('webserver', 'WEB_SERVER_PORT'), - type=int, - help="The port on which to run the server"), - 'ssl_cert': Arg( - ("--ssl_cert", ), - default=conf.get('webserver', 'WEB_SERVER_SSL_CERT'), - help="Path to the SSL certificate for the webserver"), - 'ssl_key': Arg( - ("--ssl_key", ), - default=conf.get('webserver', 'WEB_SERVER_SSL_KEY'), - help="Path to the key to use with the SSL certificate"), - 'workers': Arg( - ("-w", "--workers"), - default=conf.get('webserver', 'WORKERS'), - type=int, - help="Number of workers to run the webserver on"), - 'workerclass': Arg( - ("-k", "--workerclass"), - default=conf.get('webserver', 'WORKER_CLASS'), - choices=['sync', 'eventlet', 'gevent', 'tornado'], - help="The worker class to use for Gunicorn"), - 'worker_timeout': Arg( - ("-t", "--worker_timeout"), - default=conf.get('webserver', 'WEB_SERVER_WORKER_TIMEOUT'), - type=int, - help="The timeout for waiting on webserver workers"), - 'hostname': Arg( - ("-hn", "--hostname"), - default=conf.get('webserver', 'WEB_SERVER_HOST'), - help="Set the hostname on which to run the web server"), - 'debug': Arg( - ("-d", "--debug"), - "Use the server that ships with Flask in debug mode", - "store_true"), - 'access_logfile': Arg( - ("-A", "--access_logfile"), - default=conf.get('webserver', 'ACCESS_LOGFILE'), - help="The logfile to store the webserver access log. Use '-' to print to " - "stderr."), - 'error_logfile': Arg( - ("-E", "--error_logfile"), - default=conf.get('webserver', 'ERROR_LOGFILE'), - help="The logfile to store the webserver error log. Use '-' to print to " - "stderr."), - # resetdb - 'yes': Arg( - ("-y", "--yes"), - "Do not prompt to confirm reset. Use with care!", - "store_true", - default=False), - # scheduler - 'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"), - 'run_duration': Arg( - ("-r", "--run-duration"), - default=None, type=int, - help="Set number of seconds to execute before exiting"), - 'num_runs': Arg( - ("-n", "--num_runs"), - default=-1, type=int, - help="Set the number of runs to execute before exiting"), - # worker - 'do_pickle': Arg( - ("-p", "--do_pickle"), - default=False, - help=( - "Attempt to pickle the DAG object to send over " - "to the workers, instead of letting workers run their version " - "of the code."), - action="store_true"), - 'queues': Arg( - ("-q", "--queues"), - help="Comma delimited list of queues to serve", - default=conf.get('celery', 'DEFAULT_QUEUE')), - 'concurrency': Arg( - ("-c", "--concurrency"), - type=int, - help="The number of worker processes", - default=conf.get('celery', 'celeryd_concurrency')), - # flower - 'broker_api': Arg(("-a", "--broker_api"), help="Broker api"), - 'flower_hostname': Arg( - ("-hn", "--hostname"), - default=conf.get('celery', 'FLOWER_HOST'), - help="Set the hostname on which to run the server"), - 'flower_port': Arg( - ("-p", "--port"), - default=conf.get('celery', 'FLOWER_PORT'), - type=int, - help="The port on which to run the server"), - 'flower_conf': Arg( - ("-fc", "--flower_conf"), - help="Configuration file for flower"), - 'task_params': Arg( - ("-tp", "--task_params"), - help="Sends a JSON params dict to the task"), - # connections - 'list_connections': Arg( - ('-l', '--list'), - help='List all connections', - action='store_true'), - 'add_connection': Arg( - ('-a', '--add'), - help='Add a connection', - action='store_true'), - 'delete_connection': Arg( - ('-d', '--delete'), - help='Delete a connection', - action='store_true'), - 'conn_id': Arg( - ('--conn_id',), - help='Connection id, required to add/delete a connection', - type=str), - 'conn_uri': Arg( - ('--conn_uri',), - help='Connection URI, required to add a connection', - type=str), - 'conn_extra': Arg( - ('--conn_extra',), - help='Connection `Extra` field, optional when adding a connection', - type=str), - } - subparsers = ( - { - 'func': backfill, - 'help': "Run subsections of a DAG for a specified date range", - 'args': ( - 'dag_id', 'task_regex', 'start_date', 'end_date', - 'mark_success', 'local', 'donot_pickle', 'include_adhoc', - 'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past', - 'subdir', 'pool', 'dry_run') - }, { - 'func': list_tasks, - 'help': "List the tasks within a DAG", - 'args': ('dag_id', 'tree', 'subdir'), - }, { - 'func': clear, - 'help': "Clear a set of task instance, as if they never ran", - 'args': ( - 'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir', - 'upstream', 'downstream', 'no_confirm', 'only_failed', - 'only_running', 'exclude_subdags'), - }, { - 'func': pause, - 'help': "Pause a DAG", - 'args': ('dag_id', 'subdir'), - }, { - 'func': unpause, - 'help': "Resume a paused DAG", - 'args': ('dag_id', 'subdir'), - }, { - 'func': trigger_dag, - 'help': "Trigger a DAG run", - 'args': ('dag_id', 'subdir', 'run_id', 'conf', 'exec_date'), - }, { - 'func': pool, - 'help': "CRUD operations on pools", - "args": ('pool_set', 'pool_get', 'pool_delete'), - }, { - 'func': variables, - 'help': "CRUD operations on variables", - "args": ('set', 'get', 'json', 'default', 'var_import', 'var_export', 'var_delete'), - }, { - 'func': kerberos, - 'help': "Start a kerberos ticket renewer", - 'args': ('principal', 'keytab', 'pid', - 'daemon', 'stdout', 'stderr', 'log_file'), - }, { - 'func': render, - 'help': "Render a task instance's template(s)", - 'args': ('dag_id', 'task_id', 'execution_date', 'subdir'), - }, { - 'func': run, - 'help': "Run a single task instance", - 'args': ( - 'dag_id', 'task_id', 'execution_date', 'subdir', - 'mark_success', 'force', 'pool', 'cfg_path', - 'local', 'raw', 'ignore_all_dependencies', 'ignore_dependencies', - 'ignore_depends_on_past', 'ship_dag', 'pickle', 'job_id', 'kubernetes_mode'), - }, { - 'func': initdb, - 'help': "Initialize the metadata database", - 'args': tuple(), - }, { - 'func': list_dags, - 'help': "List all the DAGs", - 'args': ('subdir', 'report'), - }, { - 'func': dag_state, - 'help': "Get the status of a dag run", - 'args': ('dag_id', 'execution_date', 'subdir'), - }, { - 'func': task_failed_deps, - 'help': ( - "Returns the unmet dependencies for a task instance from the perspective " - "of the scheduler. In other words, why a task instance doesn't get " - "scheduled and then queued by the scheduler, and then run by an " - "executor)."), - 'args': ('dag_id', 'task_id', 'execution_date', 'subdir'), - }, { - 'func': task_state, - 'help': "Get the status of a task instance", - 'args': ('dag_id', 'task_id', 'execution_date', 'subdir'), - }, { - 'func': serve_logs, - 'help': "Serve logs generate by worker", - 'args': tuple(), - }, { - 'func': test, - 'help': ( - "Test a task instance. This will run a task without checking for " - "dependencies or recording it's state in the database."), - 'args': ( - 'dag_id', 'task_id', 'execution_date', 'subdir', 'dry_run', - 'task_params'), - }, { - 'func': webserver, - 'help': "Start a Airflow webserver instance", - 'args': ('port', 'workers', 'workerclass', 'worker_timeout', 'hostname', - 'pid', 'daemon', 'stdout', 'stderr', 'access_logfile', - 'error_logfile', 'log_file', 'ssl_cert', 'ssl_key', 'debug'), - }, { - 'func': resetdb, - 'help': "Burn down and rebuild the metadata database", - 'args': ('yes',), - }, { - 'func': upgradedb, - 'help': "Upgrade the metadata database to latest version", - 'args': tuple(), - }, { - 'func': scheduler, - 'help': "Start a scheduler instance", - 'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs', - 'do_pickle', 'pid', 'daemon', 'stdout', 'stderr', - 'log_file'), - }, { - 'func': worker, - 'help': "Start a Celery worker node", - 'args': ('do_pickle', 'queues', 'concurrency', - 'pid', 'daemon', 'stdout', 'stderr', 'log_file'), - }, { - 'func': flower, - 'help': "Start a Celery Flower", - 'args': ('flower_hostname', 'flower_port', 'flower_conf', 'broker_api', - 'pid', 'daemon', 'stdout', 'stderr', 'log_file'), - }, { - 'func': version, - 'help': "Show the version", - 'args': tuple(), - }, { - 'func': connections, - 'help': "List/Add/Delete connections", - 'args': ('list_connections', 'add_connection', 'delete_connection', - 'conn_id', 'conn_uri', 'conn_extra'), - }, - ) - subparsers_dict = {sp['func'].__name__: sp for sp in subparsers} - dag_subparsers = ( - 'list_tasks', 'backfill', 'test', 'run', 'pause', 'unpause') - - @classmethod - def get_parser(cls, dag_parser=False): - parser = argparse.ArgumentParser() - subparsers = parser.add_subparsers( - help='sub-command help', dest='subcommand') - subparsers.required = True - - subparser_list = cls.dag_subparsers if dag_parser else cls.subparsers_dict.keys() - for sub in subparser_list: - sub = cls.subparsers_dict[sub] - sp = subparsers.add_parser(sub['func'].__name__, help=sub['help']) - for arg in sub['args']: - if 'dag_id' in arg and dag_parser: - continue - arg = cls.args[arg] - kwargs = { - f: getattr(arg, f) - for f in arg._fields if f != 'flags' and getattr(arg, f)} - sp.add_argument(*arg.flags, **kwargs) - sp.set_defaults(func=sub['func']) - return parser - - -def get_parser(): - return CLIFactory.get_parser()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/executors/kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index a5aa1e1..49993a8 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -1,21 +1,22 @@ -# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. import base64 -import os import multiprocessing -import six from queue import Queue from dateutil import parser from uuid import uuid4 @@ -31,13 +32,12 @@ from airflow.models import TaskInstance, KubeResourceVersion from airflow.utils.state import State from airflow import configuration, settings from airflow.exceptions import AirflowConfigException -from airflow.contrib.kubernetes.pod import Pod, Resources from airflow.utils.log.logging_mixin import LoggingMixin class KubernetesExecutorConfig: - - def __init__(self, image=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None): + def __init__(self, image=None, request_memory=None, request_cpu=None, + limit_memory=None, limit_cpu=None): self.image = image self.request_memory = request_memory self.request_cpu = request_cpu @@ -45,10 +45,16 @@ class KubernetesExecutorConfig: self.limit_cpu = limit_cpu def __repr__(self): - return "{}(image={}, request_memory={} ,request_cpu={}, limit_memory={}, limit_cpu={})".format( - KubernetesExecutorConfig.__name__, - self.image, self.request_memory, self.request_cpu, self.limit_memory,self.limit_cpu - ) + return "{}(image={}, request_memory={} ,request_cpu={}, limit_memory={}, " \ + "limit_cpu={})"\ + .format( + KubernetesExecutorConfig.__name__, + self.image, + self.request_memory, + self.request_cpu, + self.limit_memory, + self.limit_cpu + ) @staticmethod def from_dict(obj): @@ -56,7 +62,8 @@ class KubernetesExecutorConfig: return KubernetesExecutorConfig() if not isinstance(obj, dict): - raise TypeError("Cannot convert a non-dictionary object into a KubernetesExecutorConfig") + raise TypeError( + "Cannot convert a non-dictionary object into a KubernetesExecutorConfig") namespaced = obj.get(Executors.KubernetesExecutor, {}) @@ -125,25 +132,33 @@ class KubeConfig: # Optionally, the directory in the git repository containing the dags self.git_subpath = self.safe_get(self.kubernetes_section, 'git_subpath', '') - # Optionally a user may supply a `git_user` and `git_password` for private repositories + # Optionally a user may supply a `git_user` and `git_password` for private + # repositories self.git_user = self.safe_get(self.kubernetes_section, 'git_user', None) self.git_password = self.safe_get(self.kubernetes_section, 'git_password', None) - # NOTE: The user may optionally use a volume claim to mount a PV containing DAGs directly - self.dags_volume_claim = self.safe_get(self.kubernetes_section, 'dags_volume_claim', None) + # NOTE: The user may optionally use a volume claim to mount a PV containing + # DAGs directly + self.dags_volume_claim = self.safe_get(self.kubernetes_section, + 'dags_volume_claim', None) - # This prop may optionally be set for PV Claims and is used to locate DAGs on a SubPath + # This prop may optionally be set for PV Claims and is used to locate DAGs on a + # SubPath self.dags_volume_subpath = self.safe_get( self.kubernetes_section, 'dags_volume_subpath', None) - # The Kubernetes Namespace in which the Scheduler and Webserver reside. Note that if your + # The Kubernetes Namespace in which the Scheduler and Webserver reside. Note + # that if your # cluster has RBAC enabled, your scheduler may need service account permissions to # create, watch, get, and delete pods in this namespace. - self.kube_namespace = self.safe_get(self.kubernetes_section, 'namespace', 'default') - # The Kubernetes Namespace in which pods will be created by the executor. Note that if your + self.kube_namespace = self.safe_get(self.kubernetes_section, 'namespace', + 'default') + # The Kubernetes Namespace in which pods will be created by the executor. Note + # that if your # cluster has RBAC enabled, your workers may need service account permissions to # interact with cluster components. - self.executor_namespace = self.safe_get(self.kubernetes_section, 'namespace', 'default') + self.executor_namespace = self.safe_get(self.kubernetes_section, 'namespace', + 'default') # Task secrets managed by KubernetesExecutor. self.gcp_service_account_keys = self.safe_get( self.kubernetes_section, 'gcp_service_account_keys', None) @@ -162,15 +177,18 @@ class KubeConfig: self.git_sync_init_container_name = self.safe_get( self.kubernetes_section, 'git_sync_init_container_name', 'git-sync-clone') - # The worker pod may optionally have a valid Airflow config loaded via a configmap - self.airflow_configmap = self.safe_get(self.kubernetes_section, 'airflow_configmap', None) + # The worker pod may optionally have a valid Airflow config loaded via a + # configmap + self.airflow_configmap = self.safe_get(self.kubernetes_section, + 'airflow_configmap', None) self._validate() def _validate(self): if not self.dags_volume_claim and (not self.git_repo or not self.git_branch): raise AirflowConfigException( - "In kubernetes mode the following must be set in the `kubernetes` config section: " + "In kubernetes mode the following must be set in the `kubernetes` " + "config section: " "`dags_volume_claim` or `git_repo and git_branch` ") @@ -196,7 +214,7 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): def _run(self, kube_client, resource_version): self.log.info( "Event: and now my watch begins starting at resource_version: {}" - .format(resource_version)) + .format(resource_version)) watcher = watch.Watch() kwargs = {"label_selector": "airflow-slave"} @@ -204,10 +222,12 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): kwargs["resource_version"] = resource_version last_resource_version = None - for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, **kwargs): + for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, + **kwargs): task = event['object'] self.log.info( - "Event: {} had an event of type {}".format(task.metadata.name, event['type'])) + "Event: {} had an event of type {}".format(task.metadata.name, + event['type'])) self.process_status( task.metadata.name, task.status.phase, task.metadata.labels, task.metadata.resource_version @@ -224,7 +244,7 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): self.watcher_queue.put((pod_id, State.FAILED, labels, resource_version)) elif status == 'Succeeded': self.log.info("Event: {} Succeeded".format(pod_id)) - self.watcher_queue.put((pod_id, None, labels, resource_version)) + self.watcher_queue.put((pod_id, State.SUCCESS, labels, resource_version)) elif status == 'Running': self.log.info("Event: {} is Running".format(pod_id)) else: @@ -250,7 +270,8 @@ class AirflowKubernetesScheduler(LoggingMixin, object): def _make_kube_watcher(self): resource_version = KubeResourceVersion.get_current_resource_version(self._session) - watcher = KubernetesJobWatcher(self.namespace, self.watcher_queue, resource_version) + watcher = KubernetesJobWatcher(self.namespace, self.watcher_queue, + resource_version) watcher.start() return watcher @@ -278,7 +299,7 @@ class AirflowKubernetesScheduler(LoggingMixin, object): self.log.debug("k8s: launching image {}".format(self.kube_config.kube_image)) pod = self.worker_configuration.make_pod( namespace=self.namespace, pod_id=self._create_pod_id(dag_id, task_id), - dag_id=dag_id, task_id=task_id, + dag_id=dag_id, task_id=task_id, execution_date=self._datetime_to_label_safe_datestring(execution_date), airflow_command=command, kube_executor_config=kube_executor_config ) @@ -299,7 +320,7 @@ class AirflowKubernetesScheduler(LoggingMixin, object): """ The sync function checks the status of all currently running kubernetes jobs. - If a job is completed, it's status is placed in the result queue to + If a job is completed, it's status is placed in the result queue to be sent back to the scheduler. :return: @@ -313,8 +334,8 @@ class AirflowKubernetesScheduler(LoggingMixin, object): pod_id, state, labels, resource_version = self.watcher_queue.get() self.log.info( "Attempting to finish pod; pod_id: {}; state: {}; labels: {}" - .format(pod_id, state, labels)) - key = self._labels_to_key(labels) + .format(pod_id, state, labels)) + key = self._labels_to_key(labels=labels) if key: self.log.debug("finishing job {} - {} ({})".format(key, state, pod_id)) self.result_queue.put((key, state, pod_id, resource_version)) @@ -322,8 +343,10 @@ class AirflowKubernetesScheduler(LoggingMixin, object): @staticmethod def _strip_unsafe_kubernetes_special_chars(string): """ - Kubernetes only supports lowercase alphanumeric characters and "-" and "." in the pod name - However, there are special rules about how "-" and "." can be used so let's only keep + Kubernetes only supports lowercase alphanumeric characters and "-" and "." in + the pod name + However, there are special rules about how "-" and "." can be used so let's + only keep alphanumeric chars see here for detail: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/ @@ -335,7 +358,8 @@ class AirflowKubernetesScheduler(LoggingMixin, object): @staticmethod def _make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid): """ - Kubernetes pod names must be <= 253 chars and must pass the following regex for validation + Kubernetes pod names must be <= 253 chars and must pass the following regex for + validation "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$" :param safe_dag_id: a dag_id with only alphanumeric characters @@ -347,37 +371,45 @@ class AirflowKubernetesScheduler(LoggingMixin, object): safe_key = safe_dag_id + safe_task_id - safe_pod_id = safe_key[:MAX_POD_ID_LEN-len(safe_uuid)-1] + "-" + safe_uuid + safe_pod_id = safe_key[:MAX_POD_ID_LEN - len(safe_uuid) - 1] + "-" + safe_uuid return safe_pod_id @staticmethod def _create_pod_id(dag_id, task_id): - safe_dag_id = AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(dag_id) - safe_task_id = AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(task_id) - safe_uuid = AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(uuid4().hex) - return AirflowKubernetesScheduler._make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid) + safe_dag_id = AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars( + dag_id) + safe_task_id = AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars( + task_id) + safe_uuid = AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars( + uuid4().hex) + return AirflowKubernetesScheduler._make_safe_pod_id(safe_dag_id, safe_task_id, + safe_uuid) @staticmethod def _label_safe_datestring_to_datetime(string): """ - Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not "_", let's + Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not + "_", let's replace ":" with "_" :param string: string :return: datetime.datetime object """ - return parser.parse(string.replace("_", ":")) + print("unparsing date!") + + return parser.parse(string.replace('_plus_', '+').replace("_", ":")) @staticmethod def _datetime_to_label_safe_datestring(datetime_obj): """ - Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but not "_" let's + Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but + not "_" let's replace ":" with "_" :param datetime_obj: datetime.datetime object :return: ISO-like string representing the datetime """ - return datetime_obj.isoformat().replace(":", "_") + return datetime_obj.isoformat().replace(":", "_").replace('+', '_plus_') def _labels_to_key(self, labels): try: @@ -385,15 +417,16 @@ class AirflowKubernetesScheduler(LoggingMixin, object): labels["dag_id"], labels["task_id"], self._label_safe_datestring_to_datetime(labels["execution_date"])) except Exception as e: - self.log.warn("Error while converting labels to key; labels: {}; exception: {}".format( - labels, e - )) + self.log.warn( + "Error while converting labels to key; labels: {}; exception: {}".format( + labels, e + )) return None class KubernetesExecutor(BaseExecutor, LoggingMixin): def __init__(self): - super(KubernetesExecutor, self).__init__(parallelism=PARALLELISM) + self.kube_config = KubeConfig() self.task_queue = None self._session = None self.result_queue = None @@ -403,32 +436,39 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): def clear_not_launched_queued_tasks(self): """ - If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or may not - have been launched Thus, on starting up the scheduler let's check every "Queued" task to + If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or + may not + have been launched Thus, on starting up the scheduler let's check every + "Queued" task to see if it has been launched (ie: if there is a corresponding pod on kubernetes) - If it has been launched then do nothing, otherwise reset the state to "None" so the task + If it has been launched then do nothing, otherwise reset the state to "None" so + the task will be rescheduled - This will not be necessary in a future version of airflow in which there is proper support + This will not be necessary in a future version of airflow in which there is + proper support for State.LAUNCHED """ queued_tasks = self._session.query( TaskInstance).filter(TaskInstance.state == State.QUEUED).all() self.log.info( - "When executor started up, found {} queued task instances".format(len(queued_tasks))) + "When executor started up, found {} queued task instances".format( + len(queued_tasks))) for t in queued_tasks: kwargs = dict(label_selector="dag_id={},task_id={},execution_date={}".format( t.dag_id, t.task_id, - AirflowKubernetesScheduler._datetime_to_label_safe_datestring(t.execution_date) + AirflowKubernetesScheduler._datetime_to_label_safe_datestring( + t.execution_date) )) pod_list = self.kube_client.list_namespaced_pod( self.kube_config.kube_namespace, **kwargs) if len(pod_list.items) == 0: self.log.info( - "TaskInstance: {} found in queued state but was not launched, rescheduling" - .format(t)) + "TaskInstance: {} found in queued state but was not launched, " + "rescheduling" + .format(t)) self._session.query(TaskInstance).filter( TaskInstance.dag_id == t.dag_id, TaskInstance.task_id == t.task_id, @@ -442,17 +482,20 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): try: return self.kube_client.create_namespaced_secret( self.kube_config.executor_namespace, kubernetes.client.V1Secret( - data={'key.json' : base64.b64encode(open(secret_path, 'r').read())}, + data={ + 'key.json': base64.b64encode(open(secret_path, 'r').read())}, metadata=kubernetes.client.V1ObjectMeta(name=secret_name))) except ApiException as e: if e.status == 409: return self.kube_client.replace_namespaced_secret( secret_name, self.kube_config.executor_namespace, kubernetes.client.V1Secret( - data={'key.json' : base64.b64encode(open(secret_path, 'r').read())}, + data={'key.json': base64.b64encode( + open(secret_path, 'r').read())}, metadata=kubernetes.client.V1ObjectMeta(name=secret_name))) self.log.exception("Exception while trying to inject secret." - "Secret name: {}, error details: {}.".format(secret_name, e)) + "Secret name: {}, error details: {}." + .format(secret_name, e)) raise # For each GCP service account key, inject it as a secret in executor @@ -460,8 +503,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): # We let exceptions to pass through to users. if self.kube_config.gcp_service_account_keys: name_path_pair_list = [ - {'name' : account_spec.strip().split('=')[0], - 'path' : account_spec.strip().split('=')[1]} + {'name': account_spec.strip().split('=')[0], + 'path': account_spec.strip().split('=')[1]} for account_spec in self.kube_config.gcp_service_account_keys.split(',')] for service_account in name_path_pair_list: _create_or_update_secret(service_account['name'], service_account['path']) @@ -473,16 +516,17 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): self.result_queue = Queue() self.kube_client = get_kube_client() self.kube_scheduler = AirflowKubernetesScheduler( - self.kube_config, self.task_queue, self.result_queue, self._session, self.kube_client + self.kube_config, self.task_queue, self.result_queue, self._session, + self.kube_client ) self._inject_secrets() self.clear_not_launched_queued_tasks() - def execute_async(self, key, command, queue=None, executor_config=None): - self.log.info("k8s: adding task {} with command {} with executor_config {}".format( - key, command, executor_config - )) + self.log.info( + "k8s: adding task {} with command {} with executor_config {}".format( + key, command, executor_config + )) kube_executor_config = KubernetesExecutorConfig.from_dict(executor_config) self.task_queue.put((key, command, kube_executor_config)) @@ -496,7 +540,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): results = self.result_queue.get() key, state, pod_id, resource_version = results last_resource_version = resource_version - self.log.info("Changing state of {}".format(results)) + self.log.info("Changing state of {} to {}".format(results, state)) self._change_state(key, state, pod_id) KubeResourceVersion.checkpoint_resource_version( @@ -510,8 +554,10 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): if state != State.RUNNING: self.kube_scheduler.delete_pod(pod_id) try: + self.log.info("popping: {}".format(str(key))) self.running.pop(key) - except KeyError: + except KeyError as k: + print("{}".format(k)) pass self.event_buffer[key] = state (dag_id, task_id, ex_time) = key @@ -529,7 +575,3 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): def end(self): self.log.info('ending kube executor') self.task_queue.join() - - def execute_async(self, key, command, queue=None): - self.logger.info("k8s: adding task {} with command {}".format(key, command)) - self.task_queue.put((key, command)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/__init__.py b/airflow/contrib/kubernetes/__init__.py index c82f579..13a8339 100644 --- a/airflow/contrib/kubernetes/__init__.py +++ b/airflow/contrib/kubernetes/__init__.py @@ -1,14 +1,16 @@ -# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kube_client.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py index 9603963..d1a63a2 100644 --- a/airflow/contrib/kubernetes/kube_client.py +++ b/airflow/contrib/kubernetes/kube_client.py @@ -1,26 +1,31 @@ -# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. -def get_kube_client(in_cluster=True): - # TODO: This should also allow people to point to a cluster. - +def _load_kube_config(in_cluster): from kubernetes import config, client - if in_cluster: config.load_incluster_config() return client.CoreV1Api() else: - NotImplementedError( - "Running kubernetes jobs from not within the cluster is not supported at this time") + config.load_kube_config() + return client.CoreV1Api() + + +def get_kube_client(in_cluster=True): + # TODO: This should also allow people to point to a cluster. + return _load_kube_config(in_cluster) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kubernetes_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_factory.py b/airflow/contrib/kubernetes/kubernetes_factory.py deleted file mode 100644 index 715075a..0000000 --- a/airflow/contrib/kubernetes/kubernetes_factory.py +++ /dev/null @@ -1,79 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and - -from airflow.contrib.kubernetes.kubernetes_request_factory import KubernetesRequestFactory - - -class KubernetesResourceBuilder: - def __init__( - self, - image, - cmds, - args, - namespace, - kub_req_factory=None - ): - # type: (str, list, str, KubernetesRequestFactory) -> KubernetesResourceBuilder - - self.image = image - self.args = args - self.cmds = cmds - self.kub_req_factory = kub_req_factory - self.namespace = namespace - self.envs = {} - self.labels = {} - self.secrets = {} - self.node_selectors = [] - self.name = None - self.image_pull_policy = None - - def add_env_variables(self, env): - self.envs = env - - def add_secret(self, secret): - self.secrets = self.secrets + [secret] - - def add_secrets(self, secrets): - self.secrets = secrets - - def add_labels(self, labels): - self.labels = labels - - def add_name(self, name): - self.name = name - - def set_namespace(self, namespace): - self.namespace = namespace - - def set_image_pull_policy(self, image_pull_policy): - self.image_pull_policy = image_pull_policy - - def launch(self): - """ - Launches the pod synchronously and waits for completion. - """ - k8s_beta = self._kube_client() - req = self.kub_req_factory.create(self) - self.logger.info(json.dumps(req)) - resp = k8s_beta.create_namespaced_pod(body=req, namespace=self.namespace) - self.logger.info("Job created. status='%s', yaml:\n%s", - str(resp.status), str(req)) - - def _kube_client(self): - config.load_incluster_config() - return client.CoreV1Api() - -class KubernetesPodBuilder(KubernetesResourceBuilder): - def __init__(self, image, cmds, namespace, kub_req_factory=None): - # type: (str, list, str, KubernetesRequestFactory) -> KubernetesPodBuilder - KubernetesResourceBuilder.__init__(self, image, cmds, namespace, kub_req_factory) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kubernetes_helper.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_helper.py b/airflow/contrib/kubernetes/kubernetes_helper.py deleted file mode 100644 index cad7917..0000000 --- a/airflow/contrib/kubernetes/kubernetes_helper.py +++ /dev/null @@ -1,45 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import yaml -from kubernetes import client, config -from kubernetes.client.rest import ApiException -import kubernetes - - -class KubernetesHelper(object): - def __init__(self): - config.load_incluster_config() - self.job_api = client.BatchV1Api() - self.pod_api = client.CoreV1Api() - - def launch_job(self, pod_info, namespace): - dep = yaml.load(pod_info) - resp = self.job_api.create_namespaced_job(body=dep, namespace=namespace) - return resp - - def get_status(self, pod_id, namespace): - return self.job_api.read_namespaced_job(pod_id, namespace).status - - def delete_job(self, job_id, namespace): - body = client.V1DeleteOptions() - self.job_api.delete_namespaced_job(name=job_id, namespace=namespace, body=body) - - def delete_pod(self, pod_id, namespace): - body = client.V1DeleteOptions() - try: - self.pod_api.delete_namespaced_pod(pod_id, namespace, body=body) - except ApiException as e: - if e.status != 404: - raise http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kubernetes_job_builder.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_job_builder.py b/airflow/contrib/kubernetes/kubernetes_job_builder.py deleted file mode 100644 index 4171dbc..0000000 --- a/airflow/contrib/kubernetes/kubernetes_job_builder.py +++ /dev/null @@ -1,75 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and - -from kubernetes import client, config -import json -import logging - - -class KubernetesJobBuilder: - def __init__( - self, - image, - cmds, - namespace, - kub_req_factory=None - ): - self.image = image - self.cmds = cmds - self.kub_req_factory = kub_req_factory - self.namespace = namespace - self.logger = logging.getLogger(self.__class__.__name__) - self.envs = {} - self.labels = {} - self.secrets = {} - self.node_selectors = [] - self.name = None - - def add_env_variables(self, env): - self.envs = env - - def add_secrets(self, secrets): - self.secrets = secrets - - def add_labels(self, labels): - self.labels = labels - - def add_name(self, name): - self.name = name - - def set_namespace(self, namespace): - self.namespace = namespace - - def launch(self): - """ - Launches the pod synchronously and waits for completion. - """ - k8s_beta = self._kube_client() - req = self.kub_req_factory.create(self) - print(json.dumps(req)) - resp = k8s_beta.create_namespaced_job(body=req, namespace=self.namespace) - self.logger.info("Job created. status='%s', yaml:\n%s", - str(resp.status), str(req)) - - def _execution_finished(self): - k8s_beta = self._kube_client() - resp = k8s_beta.read_namespaced_job_status(self.name, namespace=self.namespace) - self.logger.info('status : ' + str(resp.status)) - if resp.status.phase == 'Failed': - raise Exception("Job " + self.name + " failed!") - return resp.status.phase != 'Running' - - @staticmethod - def _kube_client(): - config.load_incluster_config() - return client.BatchV1Api() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py index 9921696..13a8339 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/__init__.py @@ -1,12 +1,16 @@ -# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py deleted file mode 100644 index a06b434..0000000 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/job_request_factory.py +++ /dev/null @@ -1,57 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -import logging -import yaml -from .kubernetes_request_factory import KubernetesRequestFactory, KubernetesRequestFactoryHelper as kreq - - -class SimpleJobRequestFactory(KubernetesRequestFactory): - """ - Request generator for a simple pod. - """ - - def __init__(self): - super(SimpleJobRequestFactory, self).__init__() - - _yaml = """apiVersion: batch/v1 -kind: Job -metadata: - name: name -spec: - template: - metadata: - name: name - spec: - containers: - - name: base - image: airflow-slave:latest - command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"] - restartPolicy: Never - """ - - def create_body(self, pod): - req = yaml.load(self._yaml) - kreq.extract_name(pod, req) - kreq.extract_labels(pod, req) - kreq.extract_image(pod, req) - kreq.extract_cmds(pod, req) - kreq.extract_args(pod, req) - if len(pod.node_selectors) > 0: - kreq.extract_node_selector(pod, req) - kreq.extract_secrets(pod, req) - logging.info("attaching volume mounts") - kreq.attach_volume_mounts(req) - return req - - def after_create(self, body, pod): - pass http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index b5ab074..cbf3fce 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -1,25 +1,29 @@ -# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. from abc import ABCMeta, abstractmethod import six -class KubernetesRequestFactory(): + +class KubernetesRequestFactory: """ - Create requests to be sent to kube API. Extend this class to talk to kubernetes and generate - your specific resources. This is equivalent of generating yaml files that can be used by - `kubectl` + Create requests to be sent to kube API. + Extend this class to talk to kubernetes and generate your specific resources. + This is equivalent of generating yaml files that can be used by `kubectl` """ __metaclass__ = ABCMeta @@ -32,37 +36,23 @@ class KubernetesRequestFactory(): """ pass - @abstractmethod - def after_create(self, body, pod): - """ - Is called after the create to augment the body. - - :param body: The request body - :param pod: The pod - """ - pass - - -class KubernetesRequestFactoryHelper(object): - """ - Helper methods to build a request for kubernetes - """ - @staticmethod - def extract_image_pull_policy(pod, req): - req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy - @staticmethod def extract_image(pod, req): req['spec']['containers'][0]['image'] = pod.image @staticmethod + def extract_image_pull_policy(pod, req): + if pod.image_pull_policy: + req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy + + @staticmethod def add_secret_to_env(env, secret): env.append({ - 'name':secret.deploy_target, - 'valueFrom':{ - 'secretKeyRef':{ - 'name':secret.secret, - 'key':secret.key + 'name': secret.deploy_target, + 'valueFrom': { + 'secretKeyRef': { + 'name': secret.secret, + 'key': secret.key } } }) @@ -83,8 +73,12 @@ class KubernetesRequestFactoryHelper(object): @staticmethod def extract_node_selector(pod, req): - req['spec']['nodeSelector'] = pod.node_selectors + if len(pod.node_selectors) > 0: + req['spec']['nodeSelector'] = pod.node_selectors + @staticmethod + def attach_volumes(pod, req): + req['spec']['volumes'] = pod.volumes @staticmethod def attach_volume_mounts(pod, req): @@ -106,14 +100,14 @@ class KubernetesRequestFactoryHelper(object): for idx, vol in enumerate(vol_secrets): vol_id = 'secretvol' + str(idx) req['spec']['containers'][0]['volumeMounts'].append({ - 'mountPath':vol.deploy_target, - 'name':vol_id, - 'readOnly':True + 'mountPath': vol.deploy_target, + 'name': vol_id, + 'readOnly': True }) req['spec']['volumes'].append({ - 'name':vol_id, - 'secret':{ - 'secretName':vol.secret + 'name': vol_id, + 'secret': { + 'secretName': vol.secret } }) @@ -123,7 +117,7 @@ class KubernetesRequestFactoryHelper(object): if len(pod.envs) > 0 or len(env_secrets) > 0: env = [] for k in pod.envs.keys(): - env.append({'name':k, 'value':pod.envs[k]}) + env.append({'name': k, 'value': pod.envs[k]}) for secret in env_secrets: KubernetesRequestFactory.add_secret_to_env(env, secret) req['spec']['containers'][0]['env'] = env @@ -138,16 +132,20 @@ class KubernetesRequestFactoryHelper(object): if pod.resources.has_requests(): req['spec']['containers'][0]['resources']['requests'] = {} if pod.resources.request_memory: - req['spec']['containers'][0]['resources']['requests']['memory'] = pod.resources.request_memory + req['spec']['containers'][0]['resources']['requests'][ + 'memory'] = pod.resources.request_memory if pod.resources.request_cpu: - req['spec']['containers'][0]['resources']['requests']['cpu'] = pod.resources.request_cpu + req['spec']['containers'][0]['resources']['requests'][ + 'cpu'] = pod.resources.request_cpu if pod.resources.has_limits(): req['spec']['containers'][0]['resources']['limits'] = {} if pod.resources.request_memory: - req['spec']['containers'][0]['resources']['limits']['memory'] = pod.resources.limit_memory + req['spec']['containers'][0]['resources']['limits'][ + 'memory'] = pod.resources.limit_memory if pod.resources.request_cpu: - req['spec']['containers'][0]['resources']['limits']['cpu'] = pod.resources.limit_cpu + req['spec']['containers'][0]['resources']['limits'][ + 'cpu'] = pod.resources.limit_cpu @staticmethod def extract_init_containers(pod, req): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index ea6b94b..44b05dd 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -1,20 +1,23 @@ -# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. import yaml -from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory import ( - KubernetesRequestFactory) -from airflow.contrib.kubernetes.pod import Pod +from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory \ + import KubernetesRequestFactory class SimplePodRequestFactory(KubernetesRequestFactory): @@ -30,9 +33,6 @@ spec: - name: base image: airflow-slave:latest command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"] - volumeMounts: - - name: shared-data - mountPath: "/usr/local/airflow/dags" restartPolicy: Never """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/pod.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index 1877da7..01d6760 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -1,20 +1,29 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. class Resources: - def __init__(self, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None): + def __init__( + self, + request_memory=None, + request_cpu=None, + limit_memory=None, + limit_cpu=None): self.request_memory = request_memory self.request_cpu = request_cpu self.limit_memory = limit_memory @@ -33,7 +42,6 @@ class Resources: class Pod: """ Represents a kubernetes pod and manages execution of a single pod. - :param image: The docker image :type image: str :param env: A dict containing the environment variables @@ -46,18 +54,18 @@ class Pod: successful execution of the pod :type result: any """ - pod_timeout = 3600 - def __init__( self, image, envs, cmds, - secrets, + args=None, + secrets=None, labels=None, node_selectors=None, name=None, - volumes = [], + volumes=None, + volume_mounts=None, namespace='default', result=None, image_pull_policy="IfNotPresent", @@ -67,14 +75,16 @@ class Pod: resources=None ): self.image = image - self.envs = envs if envs else {} + self.envs = envs or {} self.cmds = cmds - self.secrets = secrets + self.args = args or [] + self.secrets = secrets or [] self.result = result - self.labels = labels if labels else [] + self.labels = labels or {} self.name = name - self.volumes = volumes - self.node_selectors = node_selectors if node_selectors else [] + self.volumes = volumes or [] + self.volume_mounts = volume_mounts or [] + self.node_selectors = node_selectors or [] self.namespace = namespace self.image_pull_policy = image_pull_policy self.image_pull_secrets = image_pull_secrets http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/pod_launcher.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py index 1903060..23d3a7c 100644 --- a/airflow/contrib/kubernetes/pod_launcher.py +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -1,28 +1,31 @@ -# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. import json - -from airflow.contrib.kubernetes.pod import Pod -from airflow.contrib.kubernetes.kubernetes_request_factory.pod_request_factory import ( - SimplePodRequestFactory) +import time from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State +from datetime import datetime as dt +from airflow.contrib.kubernetes.kubernetes_request_factory import \ + pod_request_factory as pod_fac from kubernetes import watch -from kubernetes.client import V1Pod from kubernetes.client.rest import ApiException - +from airflow import AirflowException +from requests.exceptions import HTTPError from .kube_client import get_kube_client @@ -34,10 +37,11 @@ class PodStatus(object): class PodLauncher(LoggingMixin): - def __init__(self, kube_client=None): - self.kube_req_factory = SimplePodRequestFactory() - self._client = kube_client or get_kube_client() + def __init__(self, kube_client=None, in_cluster=True): + super(PodLauncher, self).__init__() + self._client = kube_client or get_kube_client(in_cluster=in_cluster) self._watch = watch.Watch() + self.kube_req_factory = pod_fac.SimplePodRequestFactory() def run_pod_async(self, pod): req = self.kube_req_factory.create(pod) @@ -50,35 +54,71 @@ class PodLauncher(LoggingMixin): raise return resp - def run_pod(self, pod): + def run_pod(self, pod, startup_timeout=120, get_logs=True): # type: (Pod) -> State """ Launches the pod synchronously and waits for completion. + Args: + pod (Pod): + startup_timeout (int): Timeout for startup of the pod (if pod is pending for + too long, considers task a failure """ resp = self.run_pod_async(pod) - final_status = self._monitor_pod(pod) + curr_time = dt.now() + if resp.status.start_time is None: + while self.pod_not_started(pod): + delta = dt.now() - curr_time + if delta.seconds >= startup_timeout: + raise AirflowException("Pod took too long to start") + time.sleep(1) + self.log.debug('Pod not yet started') + + final_status = self._monitor_pod(pod, get_logs) return final_status - def _monitor_pod(self, pod): + def _monitor_pod(self, pod, get_logs): # type: (Pod) -> State - for event in self._watch.stream(self.read_pod(pod), pod.namespace): - status = self._task_status(event) - if status == State.SUCCESS or status == State.FAILED: - return status + + if get_logs: + logs = self._client.read_namespaced_pod_log( + name=pod.name, + namespace=pod.namespace, + follow=True, + tail_lines=10, + _preload_content=False) + for line in logs: + self.log.info(line) + else: + while self.pod_is_running(pod): + self.log.info("Pod {} has state {}".format(pod.name, State.RUNNING)) + time.sleep(2) + return self._task_status(self.read_pod(pod)) def _task_status(self, event): # type: (V1Pod) -> State - task = event['object'] self.log.info( - "Event: {} had an event of type {}".format(task.metadata.name, - event['type'])) - status = self.process_status(task.metadata.name, task.status.phase) + "Event: {} had an event of type {}".format(event.metadata.name, + event.status.phase)) + status = self.process_status(event.metadata.name, event.status.phase) return status + def pod_not_started(self, pod): + state = self._task_status(self.read_pod(pod)) + return state == State.QUEUED + + def pod_is_running(self, pod): + state = self._task_status(self.read_pod(pod)) + return state != State.SUCCESS and state != State.FAILED + def read_pod(self, pod): - return self._client.read_namespaced_pod(pod.name, pod.namespace) + try: + return self._client.read_namespaced_pod(pod.name, pod.namespace) + except HTTPError as e: + raise AirflowException("There was an error reading the kubernetes API: {}" + .format(e)) def process_status(self, job_id, status): + status = status.lower() if status == PodStatus.PENDING: return State.QUEUED elif status == PodStatus.FAILED: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/secret.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py index a798a24..23bfacc 100644 --- a/airflow/contrib/kubernetes/secret.py +++ b/airflow/contrib/kubernetes/secret.py @@ -1,25 +1,29 @@ -# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. class Secret: - """Defines Kubernetes Secret Containers""" + """Defines Kubernetes Secret Volume""" def __init__(self, deploy_type, deploy_target, secret, key): - """Initialize a Kubernetes Secret Object. Used to track requested secrets from the user. - - :param deploy_type: The type of secret deploy in Kubernetes, either `env` or `volume` + """Initialize a Kubernetes Secret Object. Used to track requested secrets from + the user. + :param deploy_type: The type of secret deploy in Kubernetes, either `env` or + `volume` :type deploy_type: ``str`` :param deploy_target: The environment variable to be created in the worker. :type deploy_target: ``str`` http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9a87a07/airflow/contrib/kubernetes/worker_configuration.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index f59576a..5cb92ef 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -1,16 +1,19 @@ -# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. import copy import os @@ -34,21 +37,21 @@ class WorkerConfiguration: # Otherwise, define a git-sync init container init_environment = [{ - 'name': 'GIT_SYNC_REPO', - 'value': self.kube_config.git_repo - }, { - 'name': 'GIT_SYNC_BRANCH', - 'value': self.kube_config.git_branch - }, { - 'name': 'GIT_SYNC_ROOT', - 'value': '/tmp' - }, { - 'name': 'GIT_SYNC_DEST', - 'value': 'dags' - }, { - 'name': 'GIT_SYNC_ONE_TIME', - 'value': 'true' - }] + 'name': 'GIT_SYNC_REPO', + 'value': self.kube_config.git_repo + }, { + 'name': 'GIT_SYNC_BRANCH', + 'value': self.kube_config.git_branch + }, { + 'name': 'GIT_SYNC_ROOT', + 'value': '/tmp' + }, { + 'name': 'GIT_SYNC_DEST', + 'value': 'dags' + }, { + 'name': 'GIT_SYNC_ONE_TIME', + 'value': 'true' + }] if self.kube_config.git_user: init_environment.append({ 'name': 'GIT_SYNC_USERNAME', @@ -72,7 +75,8 @@ class WorkerConfiguration: def _get_volumes_and_mounts(self): """Determine volumes and volume mounts for Airflow workers""" dags_volume_name = "airflow-dags" - dags_path = os.path.join(self.kube_config.dags_folder, self.kube_config.git_subpath) + dags_path = os.path.join(self.kube_config.dags_folder, + self.kube_config.git_subpath) volumes = [{ 'name': dags_volume_name }] @@ -101,7 +105,8 @@ class WorkerConfiguration: # A PV with the DAGs should be mounted if self.kube_config.dags_volume_claim: - volumes[0]['persistentVolumeClaim'] = {"claimName": self.kube_config.dags_volume_claim} + volumes[0]['persistentVolumeClaim'] = { + "claimName": self.kube_config.dags_volume_claim} if self.kube_config.dags_volume_subpath: volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath else: @@ -124,7 +129,8 @@ class WorkerConfiguration: worker_secrets = [] for env_var_name, obj_key_pair in six.iteritems(self.kube_config.kube_secrets): k8s_secret_obj, k8s_secret_key = obj_key_pair.split('=') - worker_secrets.append(Secret('env', env_var_name, k8s_secret_obj, k8s_secret_key)) + worker_secrets.append( + Secret('env', env_var_name, k8s_secret_obj, k8s_secret_key)) return worker_secrets def _get_image_pull_secrets(self): @@ -133,9 +139,11 @@ class WorkerConfiguration: return [] return self.kube_config.image_pull_secrets.split(',') - def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, airflow_command, kube_executor_config): + def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, + airflow_command, kube_executor_config): volumes, volume_mounts = self._get_volumes_and_mounts() - worker_init_container_spec = self._get_init_containers(copy.deepcopy(volume_mounts)) + worker_init_container_spec = self._get_init_containers( + copy.deepcopy(volume_mounts)) resources = Resources( request_memory=kube_executor_config.request_memory, request_cpu=kube_executor_config.request_cpu,
