Repository: incubator-airflow Updated Branches: refs/heads/master 7bdf6162f -> 2b93f8888
[AIRFLOW-2567] Extract result from the kubernetes pod as Xcom Closes #3466 from mrkm4ntr/airflow-2567 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2b93f888 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2b93f888 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2b93f888 Branch: refs/heads/master Commit: 2b93f888882e72cb705c90108df85565e3f8f6d3 Parents: 7bdf616 Author: Shintaro Murakami <[email protected]> Authored: Tue Jun 19 09:54:22 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Tue Jun 19 09:54:22 2018 +0200 ---------------------------------------------------------------------- airflow/contrib/kubernetes/kube_client.py | 17 +++--- .../pod_request_factory.py | 57 ++++++++++++++++++ airflow/contrib/kubernetes/pod_launcher.py | 63 +++++++++++++++++--- .../operators/kubernetes_pod_operator.py | 13 +++- .../minikube/test_kubernetes_pod_operator.py | 17 +++++- 5 files changed, 148 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b93f888/airflow/contrib/kubernetes/kube_client.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py index be8d56e..8b71f41 100644 --- a/airflow/contrib/kubernetes/kube_client.py +++ b/airflow/contrib/kubernetes/kube_client.py @@ -15,21 +15,22 @@ # specific language governing permissions and limitations # under the License. from airflow.configuration import conf +from six import PY2 def _load_kube_config(in_cluster, cluster_context, config_file): from kubernetes import config, client if in_cluster: config.load_incluster_config() - return client.CoreV1Api() else: - if cluster_context is None and config_file is None: - config.load_kube_config() - return client.CoreV1Api() - else: - return client.CoreV1Api( - api_client=config.new_client_from_config(config_file=config_file, - context=cluster_context)) + config.load_kube_config(config_file=config_file, context=cluster_context) + if PY2: + # For connect_get_namespaced_pod_exec + from kubernetes.client import Configuration + configuration = Configuration() + configuration.assert_hostname = False + Configuration.set_default(configuration) + return client.CoreV1Api() def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster'), http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b93f888/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index b41ee8a..95d6c82 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -60,3 +60,60 @@ spec: self.extract_annotations(pod, req) self.extract_affinity(pod, req) return req + + +class ExtractXcomPodRequestFactory(KubernetesRequestFactory): + + XCOM_MOUNT_PATH = '/airflow/xcom' + SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar' + """ + Request generator for a pod with sidecar container. + """ + _yaml = """apiVersion: v1 +kind: Pod +metadata: + name: name +spec: + volumes: + - name: xcom + emptyDir: {{}} + containers: + - name: base + image: airflow-worker:latest + command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"] + volumeMounts: + - name: xcom + mountPath: {xcomMountPath} + - name: {sidecarContainerName} + image: python:3.5-alpine + command: ["python", "-m", "http.server"] + volumeMounts: + - name: xcom + mountPath: {xcomMountPath} + restartPolicy: Never + """.format(xcomMountPath=XCOM_MOUNT_PATH, sidecarContainerName=SIDECAR_CONTAINER_NAME) + + def __init__(self): + pass + + def create(self, pod): + # type: (Pod) -> dict + req = yaml.load(self._yaml) + self.extract_name(pod, req) + self.extract_labels(pod, req) + self.extract_image(pod, req) + self.extract_image_pull_policy(pod, req) + self.extract_cmds(pod, req) + self.extract_args(pod, req) + self.extract_node_selector(pod, req) + self.extract_env_and_secrets(pod, req) + self.extract_volume_secrets(pod, req) + self.attach_volumes(pod, req) + self.attach_volume_mounts(pod, req) + self.extract_resources(pod, req) + self.extract_service_account_name(pod, req) + self.extract_init_containers(pod, req) + self.extract_image_pull_secrets(pod, req) + self.extract_annotations(pod, req) + self.extract_affinity(pod, req) + return req http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b93f888/airflow/contrib/kubernetes/pod_launcher.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py index c1c3f30..8ac5108 100644 --- a/airflow/contrib/kubernetes/pod_launcher.py +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -21,9 +21,10 @@ from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State from datetime import datetime as dt from airflow.contrib.kubernetes.kubernetes_request_factory import \ - pod_request_factory as pod_fac + pod_request_factory as pod_factory from kubernetes import watch from kubernetes.client.rest import ApiException +from kubernetes.stream import stream as kubernetes_stream from airflow import AirflowException from requests.exceptions import HTTPError from .kube_client import get_kube_client @@ -37,12 +38,15 @@ class PodStatus(object): class PodLauncher(LoggingMixin): - def __init__(self, kube_client=None, in_cluster=True, cluster_context=None): + def __init__(self, kube_client=None, in_cluster=True, cluster_context=None, + extract_xcom=False): super(PodLauncher, self).__init__() self._client = kube_client or get_kube_client(in_cluster=in_cluster, cluster_context=cluster_context) self._watch = watch.Watch() - self.kube_req_factory = pod_fac.SimplePodRequestFactory() + self.extract_xcom = extract_xcom + self.kube_req_factory = pod_factory.ExtractXcomPodRequestFactory( + ) if extract_xcom else pod_factory.SimplePodRequestFactory() def run_pod_async(self, pod): req = self.kube_req_factory.create(pod) @@ -56,7 +60,7 @@ class PodLauncher(LoggingMixin): return resp def run_pod(self, pod, startup_timeout=120, get_logs=True): - # type: (Pod) -> State + # type: (Pod) -> (State, result) """ Launches the pod synchronously and waits for completion. Args: @@ -74,25 +78,33 @@ class PodLauncher(LoggingMixin): time.sleep(1) self.log.debug('Pod not yet started') - final_status = self._monitor_pod(pod, get_logs) - return final_status + return self._monitor_pod(pod, get_logs) def _monitor_pod(self, pod, get_logs): - # type: (Pod) -> State + # type: (Pod) -> (State, content) if get_logs: logs = self._client.read_namespaced_pod_log( name=pod.name, namespace=pod.namespace, + container='base', follow=True, tail_lines=10, _preload_content=False) for line in logs: self.log.info(line) + result = None + if self.extract_xcom: + while self.base_container_is_running(pod): + self.log.info('Container %s has state %s', pod.name, State.RUNNING) + time.sleep(2) + result = self._extract_xcom(pod) + self.log.info(result) + result = json.loads(result) while self.pod_is_running(pod): self.log.info('Pod %s has state %s', pod.name, State.RUNNING) time.sleep(2) - return self._task_status(self.read_pod(pod)) + return (self._task_status(self.read_pod(pod)), result) def _task_status(self, event): self.log.info( @@ -109,6 +121,12 @@ class PodLauncher(LoggingMixin): state = self._task_status(self.read_pod(pod)) return state != State.SUCCESS and state != State.FAILED + def base_container_is_running(self, pod): + event = self.read_pod(pod) + status = next(iter(filter(lambda s: s.name == 'base', + event.status.container_statuses)), None) + return status.state.running is not None + def read_pod(self, pod): try: return self._client.read_namespaced_pod(pod.name, pod.namespace) @@ -117,6 +135,35 @@ class PodLauncher(LoggingMixin): 'There was an error reading the kubernetes API: {}'.format(e) ) + def _extract_xcom(self, pod): + resp = kubernetes_stream(self._client.connect_get_namespaced_pod_exec, + pod.name, pod.namespace, + container=self.kube_req_factory.SIDECAR_CONTAINER_NAME, + command=['/bin/sh'], stdin=True, stdout=True, + stderr=True, tty=False, + _preload_content=False) + try: + result = self._exec_pod_command( + resp, 'cat {}/return.json'.format(self.kube_req_factory.XCOM_MOUNT_PATH)) + self._exec_pod_command(resp, 'kill -s SIGINT 1') + finally: + resp.close() + if result is None: + raise AirflowException('Failed to extract xcom from pod: {}'.format(pod.name)) + return result + + def _exec_pod_command(self, resp, command): + if resp.is_open(): + self.log.info('Running command... %s\n' % command) + resp.write_stdin(command + '\n') + while resp.is_open(): + resp.update(timeout=1) + if resp.peek_stdout(): + return resp.read_stdout() + if resp.peek_stderr(): + self.log.info(resp.read_stderr()) + break + def process_status(self, job_id, status): status = status.lower() if status == PodStatus.PENDING: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b93f888/airflow/contrib/operators/kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index 3e4b8a3..bf656f1 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -72,6 +72,10 @@ class KubernetesPodOperator(BaseOperator): :type affinity: dict :param config_file: The path to the Kubernetes config file :type config_file: str + :param xcom_push: If xcom_push is True, the content of the file + /airflow/xcom/return.json in the container will also be pushed to an + XCom when the container completes. + :type xcom_push: bool """ template_fields = ('cmds', 'arguments', 'env_vars', 'config_file') @@ -103,8 +107,9 @@ class KubernetesPodOperator(BaseOperator): pod.resources = self.resources pod.affinity = self.affinity - launcher = pod_launcher.PodLauncher(kube_client=client) - final_state = launcher.run_pod( + launcher = pod_launcher.PodLauncher(kube_client=client, + extract_xcom=self.xcom_push) + (final_state, result) = launcher.run_pod( pod, startup_timeout=self.startup_timeout_seconds, get_logs=self.get_logs) @@ -112,6 +117,8 @@ class KubernetesPodOperator(BaseOperator): raise AirflowException( 'Pod returned a failure: {state}'.format(state=final_state) ) + if self.xcom_push: + return result except AirflowException as ex: raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) @@ -136,6 +143,7 @@ class KubernetesPodOperator(BaseOperator): resources=None, affinity=None, config_file=None, + xcom_push=False, *args, **kwargs): super(KubernetesPodOperator, self).__init__(*args, **kwargs) @@ -156,5 +164,6 @@ class KubernetesPodOperator(BaseOperator): self.image_pull_policy = image_pull_policy self.annotations = annotations or {} self.affinity = affinity or {} + self.xcom_push = xcom_push self.resources = resources or Resources() self.config_file = config_file http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b93f888/tests/contrib/minikube/test_kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py index 4cfa3b4..5c799f4 100644 --- a/tests/contrib/minikube/test_kubernetes_pod_operator.py +++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py @@ -22,6 +22,7 @@ from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOpera from airflow import AirflowException from subprocess import check_call import mock +import json from airflow.contrib.kubernetes.pod_launcher import PodLauncher from airflow.contrib.kubernetes.volume_mount import VolumeMount from airflow.contrib.kubernetes.volume import Volume @@ -72,7 +73,7 @@ class KubernetesPodOperatorTest(unittest.TestCase): in_cluster=False, cluster_context='default' ) - launcher_mock.return_value = State.SUCCESS + launcher_mock.return_value = (State.SUCCESS, None) k.execute(None) client_mock.assert_called_with(in_cluster=False, cluster_context='default', @@ -167,6 +168,20 @@ class KubernetesPodOperatorTest(unittest.TestCase): with self.assertRaises(AirflowException): k.execute(None) + def test_xcom_push(self): + return_value = '{"foo": "bar"\n, "buzz": 2}' + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=['echo \'{}\' > /airflow/xcom/return.json'.format(return_value)], + labels={"foo": "bar"}, + name="test", + task_id="task", + xcom_push=True + ) + self.assertEqual(k.execute(None), json.loads(return_value)) + if __name__ == '__main__': unittest.main()
