Repository: incubator-airflow
Updated Branches:
  refs/heads/master 7bdf6162f -> 2b93f8888


[AIRFLOW-2567] Extract result from the kubernetes pod as Xcom

Closes #3466 from mrkm4ntr/airflow-2567


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2b93f888
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2b93f888
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2b93f888

Branch: refs/heads/master
Commit: 2b93f888882e72cb705c90108df85565e3f8f6d3
Parents: 7bdf616
Author: Shintaro Murakami <[email protected]>
Authored: Tue Jun 19 09:54:22 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Tue Jun 19 09:54:22 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/kubernetes/kube_client.py       | 17 +++---
 .../pod_request_factory.py                      | 57 ++++++++++++++++++
 airflow/contrib/kubernetes/pod_launcher.py      | 63 +++++++++++++++++---
 .../operators/kubernetes_pod_operator.py        | 13 +++-
 .../minikube/test_kubernetes_pod_operator.py    | 17 +++++-
 5 files changed, 148 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b93f888/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py 
b/airflow/contrib/kubernetes/kube_client.py
index be8d56e..8b71f41 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -15,21 +15,22 @@
 # specific language governing permissions and limitations
 # under the License.
 from airflow.configuration import conf
+from six import PY2
 
 
 def _load_kube_config(in_cluster, cluster_context, config_file):
     from kubernetes import config, client
     if in_cluster:
         config.load_incluster_config()
-        return client.CoreV1Api()
     else:
-        if cluster_context is None and config_file is None:
-            config.load_kube_config()
-            return client.CoreV1Api()
-        else:
-            return client.CoreV1Api(
-                
api_client=config.new_client_from_config(config_file=config_file,
-                                                         
context=cluster_context))
+        config.load_kube_config(config_file=config_file, 
context=cluster_context)
+    if PY2:
+        # For connect_get_namespaced_pod_exec
+        from kubernetes.client import Configuration
+        configuration = Configuration()
+        configuration.assert_hostname = False
+        Configuration.set_default(configuration)
+    return client.CoreV1Api()
 
 
 def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster'),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b93f888/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index b41ee8a..95d6c82 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -60,3 +60,60 @@ spec:
         self.extract_annotations(pod, req)
         self.extract_affinity(pod, req)
         return req
+
+
+class ExtractXcomPodRequestFactory(KubernetesRequestFactory):
+
+    XCOM_MOUNT_PATH = '/airflow/xcom'
+    SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
+    """
+    Request generator for a pod with sidecar container.
+    """
+    _yaml = """apiVersion: v1
+kind: Pod
+metadata:
+  name: name
+spec:
+  volumes:
+    - name: xcom
+      emptyDir: {{}}
+  containers:
+    - name: base
+      image: airflow-worker:latest
+      command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"]
+      volumeMounts:
+        - name: xcom
+          mountPath: {xcomMountPath}
+    - name: {sidecarContainerName}
+      image: python:3.5-alpine
+      command: ["python", "-m", "http.server"]
+      volumeMounts:
+        - name: xcom
+          mountPath: {xcomMountPath}
+  restartPolicy: Never
+    """.format(xcomMountPath=XCOM_MOUNT_PATH, 
sidecarContainerName=SIDECAR_CONTAINER_NAME)
+
+    def __init__(self):
+        pass
+
+    def create(self, pod):
+        # type: (Pod) -> dict
+        req = yaml.load(self._yaml)
+        self.extract_name(pod, req)
+        self.extract_labels(pod, req)
+        self.extract_image(pod, req)
+        self.extract_image_pull_policy(pod, req)
+        self.extract_cmds(pod, req)
+        self.extract_args(pod, req)
+        self.extract_node_selector(pod, req)
+        self.extract_env_and_secrets(pod, req)
+        self.extract_volume_secrets(pod, req)
+        self.attach_volumes(pod, req)
+        self.attach_volume_mounts(pod, req)
+        self.extract_resources(pod, req)
+        self.extract_service_account_name(pod, req)
+        self.extract_init_containers(pod, req)
+        self.extract_image_pull_secrets(pod, req)
+        self.extract_annotations(pod, req)
+        self.extract_affinity(pod, req)
+        return req

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b93f888/airflow/contrib/kubernetes/pod_launcher.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/pod_launcher.py 
b/airflow/contrib/kubernetes/pod_launcher.py
index c1c3f30..8ac5108 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -21,9 +21,10 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 from datetime import datetime as dt
 from airflow.contrib.kubernetes.kubernetes_request_factory import \
-    pod_request_factory as pod_fac
+    pod_request_factory as pod_factory
 from kubernetes import watch
 from kubernetes.client.rest import ApiException
+from kubernetes.stream import stream as kubernetes_stream
 from airflow import AirflowException
 from requests.exceptions import HTTPError
 from .kube_client import get_kube_client
@@ -37,12 +38,15 @@ class PodStatus(object):
 
 
 class PodLauncher(LoggingMixin):
-    def __init__(self, kube_client=None, in_cluster=True, 
cluster_context=None):
+    def __init__(self, kube_client=None, in_cluster=True, cluster_context=None,
+                 extract_xcom=False):
         super(PodLauncher, self).__init__()
         self._client = kube_client or get_kube_client(in_cluster=in_cluster,
                                                       
cluster_context=cluster_context)
         self._watch = watch.Watch()
-        self.kube_req_factory = pod_fac.SimplePodRequestFactory()
+        self.extract_xcom = extract_xcom
+        self.kube_req_factory = pod_factory.ExtractXcomPodRequestFactory(
+        ) if extract_xcom else pod_factory.SimplePodRequestFactory()
 
     def run_pod_async(self, pod):
         req = self.kube_req_factory.create(pod)
@@ -56,7 +60,7 @@ class PodLauncher(LoggingMixin):
         return resp
 
     def run_pod(self, pod, startup_timeout=120, get_logs=True):
-        # type: (Pod) -> State
+        # type: (Pod) -> (State, result)
         """
         Launches the pod synchronously and waits for completion.
         Args:
@@ -74,25 +78,33 @@ class PodLauncher(LoggingMixin):
                 time.sleep(1)
             self.log.debug('Pod not yet started')
 
-        final_status = self._monitor_pod(pod, get_logs)
-        return final_status
+        return self._monitor_pod(pod, get_logs)
 
     def _monitor_pod(self, pod, get_logs):
-        # type: (Pod) -> State
+        # type: (Pod) -> (State, content)
 
         if get_logs:
             logs = self._client.read_namespaced_pod_log(
                 name=pod.name,
                 namespace=pod.namespace,
+                container='base',
                 follow=True,
                 tail_lines=10,
                 _preload_content=False)
             for line in logs:
                 self.log.info(line)
+        result = None
+        if self.extract_xcom:
+            while self.base_container_is_running(pod):
+                self.log.info('Container %s has state %s', pod.name, 
State.RUNNING)
+                time.sleep(2)
+            result = self._extract_xcom(pod)
+            self.log.info(result)
+            result = json.loads(result)
         while self.pod_is_running(pod):
             self.log.info('Pod %s has state %s', pod.name, State.RUNNING)
             time.sleep(2)
-        return self._task_status(self.read_pod(pod))
+        return (self._task_status(self.read_pod(pod)), result)
 
     def _task_status(self, event):
         self.log.info(
@@ -109,6 +121,12 @@ class PodLauncher(LoggingMixin):
         state = self._task_status(self.read_pod(pod))
         return state != State.SUCCESS and state != State.FAILED
 
+    def base_container_is_running(self, pod):
+        event = self.read_pod(pod)
+        status = next(iter(filter(lambda s: s.name == 'base',
+                                  event.status.container_statuses)), None)
+        return status.state.running is not None
+
     def read_pod(self, pod):
         try:
             return self._client.read_namespaced_pod(pod.name, pod.namespace)
@@ -117,6 +135,35 @@ class PodLauncher(LoggingMixin):
                 'There was an error reading the kubernetes API: {}'.format(e)
             )
 
+    def _extract_xcom(self, pod):
+        resp = kubernetes_stream(self._client.connect_get_namespaced_pod_exec,
+                                 pod.name, pod.namespace,
+                                 
container=self.kube_req_factory.SIDECAR_CONTAINER_NAME,
+                                 command=['/bin/sh'], stdin=True, stdout=True,
+                                 stderr=True, tty=False,
+                                 _preload_content=False)
+        try:
+            result = self._exec_pod_command(
+                resp, 'cat 
{}/return.json'.format(self.kube_req_factory.XCOM_MOUNT_PATH))
+            self._exec_pod_command(resp, 'kill -s SIGINT 1')
+        finally:
+            resp.close()
+        if result is None:
+            raise AirflowException('Failed to extract xcom from pod: 
{}'.format(pod.name))
+        return result
+
+    def _exec_pod_command(self, resp, command):
+        if resp.is_open():
+            self.log.info('Running command... %s\n' % command)
+            resp.write_stdin(command + '\n')
+            while resp.is_open():
+                resp.update(timeout=1)
+                if resp.peek_stdout():
+                    return resp.read_stdout()
+                if resp.peek_stderr():
+                    self.log.info(resp.read_stderr())
+                    break
+
     def process_status(self, job_id, status):
         status = status.lower()
         if status == PodStatus.PENDING:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b93f888/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py 
b/airflow/contrib/operators/kubernetes_pod_operator.py
index 3e4b8a3..bf656f1 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -72,6 +72,10 @@ class KubernetesPodOperator(BaseOperator):
     :type affinity: dict
     :param config_file: The path to the Kubernetes config file
     :type config_file: str
+    :param xcom_push: If xcom_push is True, the content of the file
+        /airflow/xcom/return.json in the container will also be pushed to an
+        XCom when the container completes.
+    :type xcom_push: bool
     """
     template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
 
@@ -103,8 +107,9 @@ class KubernetesPodOperator(BaseOperator):
             pod.resources = self.resources
             pod.affinity = self.affinity
 
-            launcher = pod_launcher.PodLauncher(kube_client=client)
-            final_state = launcher.run_pod(
+            launcher = pod_launcher.PodLauncher(kube_client=client,
+                                                extract_xcom=self.xcom_push)
+            (final_state, result) = launcher.run_pod(
                 pod,
                 startup_timeout=self.startup_timeout_seconds,
                 get_logs=self.get_logs)
@@ -112,6 +117,8 @@ class KubernetesPodOperator(BaseOperator):
                 raise AirflowException(
                     'Pod returned a failure: {state}'.format(state=final_state)
                 )
+            if self.xcom_push:
+                return result
         except AirflowException as ex:
             raise AirflowException('Pod Launching failed: 
{error}'.format(error=ex))
 
@@ -136,6 +143,7 @@ class KubernetesPodOperator(BaseOperator):
                  resources=None,
                  affinity=None,
                  config_file=None,
+                 xcom_push=False,
                  *args,
                  **kwargs):
         super(KubernetesPodOperator, self).__init__(*args, **kwargs)
@@ -156,5 +164,6 @@ class KubernetesPodOperator(BaseOperator):
         self.image_pull_policy = image_pull_policy
         self.annotations = annotations or {}
         self.affinity = affinity or {}
+        self.xcom_push = xcom_push
         self.resources = resources or Resources()
         self.config_file = config_file

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2b93f888/tests/contrib/minikube/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py 
b/tests/contrib/minikube/test_kubernetes_pod_operator.py
index 4cfa3b4..5c799f4 100644
--- a/tests/contrib/minikube/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py
@@ -22,6 +22,7 @@ from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOpera
 from airflow import AirflowException
 from subprocess import check_call
 import mock
+import json
 from airflow.contrib.kubernetes.pod_launcher import PodLauncher
 from airflow.contrib.kubernetes.volume_mount import VolumeMount
 from airflow.contrib.kubernetes.volume import Volume
@@ -72,7 +73,7 @@ class KubernetesPodOperatorTest(unittest.TestCase):
             in_cluster=False,
             cluster_context='default'
         )
-        launcher_mock.return_value = State.SUCCESS
+        launcher_mock.return_value = (State.SUCCESS, None)
         k.execute(None)
         client_mock.assert_called_with(in_cluster=False,
                                        cluster_context='default',
@@ -167,6 +168,20 @@ class KubernetesPodOperatorTest(unittest.TestCase):
         with self.assertRaises(AirflowException):
             k.execute(None)
 
+    def test_xcom_push(self):
+        return_value = '{"foo": "bar"\n, "buzz": 2}'
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=['echo \'{}\' > 
/airflow/xcom/return.json'.format(return_value)],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            xcom_push=True
+        )
+        self.assertEqual(k.execute(None), json.loads(return_value))
+
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to