This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new ff827f7  Make Kubernetes tests pass (#9332)
ff827f7 is described below

commit ff827f78432b40cfb097e1ffc31c4ca9649888df
Author: Daniel Imberman <[email protected]>
AuthorDate: Wed Jun 17 08:44:34 2020 -0700

    Make Kubernetes tests pass (#9332)
    
    * Fixes k8s tests on 1-10-test
    
    * add pyfile dependency
    
    * add pyfile dependency
    
    Co-authored-by: Daniel Imberman <[email protected]>
---
 .github/workflows/ci.yml                           |   2 +-
 .../contrib/operators/kubernetes_pod_operator.py   |   6 +
 airflow/kubernetes/pod_generator.py                |   3 +-
 kubernetes_tests/test_kubernetes_pod_operator.py   | 133 ++++-----------------
 .../kubernetes/app/templates/airflow.template.yaml |  43 +++++++
 scripts/ci/libraries/_kind.sh                      |   7 +-
 6 files changed, 78 insertions(+), 116 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index de37788..f2c96d1 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -106,7 +106,7 @@ jobs:
     timeout-minutes: 80
     name: "K8s: ${{matrix.kube-mode}} ${{matrix.python-version}} 
${{matrix.kubernetes-version}}"
     runs-on: ubuntu-latest
-    needs: [static-checks]
+    needs: [static-checks, pyfiles]
     strategy:
       matrix:
         python-version: [3.6, 3.7]
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py 
b/airflow/contrib/operators/kubernetes_pod_operator.py
index d1fba07..bfe128d 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -150,8 +150,10 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
                 extract_xcom=self.do_xcom_push,
                 image_pull_policy=self.image_pull_policy,
                 node_selectors=self.node_selectors,
+                priority_class_name=self.priority_class_name,
                 annotations=self.annotations,
                 affinity=self.affinity,
+                init_containers=self.init_containers,
                 image_pull_secrets=self.image_pull_secrets,
                 service_account_name=self.service_account_name,
                 hostnetwork=self.hostnetwork,
@@ -223,6 +225,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
                  annotations=None,
                  resources=None,
                  affinity=None,
+                 init_containers=None,
                  config_file=None,
                  do_xcom_push=False,
                  node_selectors=None,
@@ -236,6 +239,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
                  pod_runtime_info_envs=None,
                  dnspolicy=None,
                  full_pod_spec=None,
+                 priority_class_name=None,
                  *args,
                  **kwargs):
         # 
https://github.com/apache/airflow/blob/2d0eff4ee4fafcf8c7978ac287a8fb968e56605f/UPDATING.md#unification-of-do_xcom_push-flag
@@ -257,6 +261,8 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
         self.name = name
         self.env_vars = env_vars or {}
         self.ports = ports or []
+        self.init_containers = init_containers or []
+        self.priority_class_name = priority_class_name
         self.volume_mounts = volume_mounts or []
         self.volumes = volumes or []
         self.secrets = secrets or []
diff --git a/airflow/kubernetes/pod_generator.py 
b/airflow/kubernetes/pod_generator.py
index 5b86161..2a5a0df 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -118,6 +118,7 @@ class PodGenerator:
         security_context=None,
         configmaps=None,
         dnspolicy=None,
+        priority_class_name=None,
         pod=None,
         extract_xcom=False,
     ):
@@ -176,7 +177,7 @@ class PodGenerator:
         self.spec.volumes = volumes or []
         self.spec.node_selector = node_selectors
         self.spec.restart_policy = restart_policy
-
+        self.spec.priority_class_name = priority_class_name
         self.spec.image_pull_secrets = []
 
         if image_pull_secrets:
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 7eb4beb..360add1 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -74,10 +74,11 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
                 'labels': {
                     'foo': 'bar', 'kubernetes_pod_operator': 'True',
                     'airflow_version': airflow_version.replace('+', '-'),
-                    'execution_date': '2016-01-01T0100000100-a2f50a31f',
-                    'dag_id': 'dag',
-                    'task_id': 'task',
-                    'try_number': '1'},
+                    # 'execution_date': '2016-01-01T0100000100-a2f50a31f',
+                    # 'dag_id': 'dag',
+                    # 'task_id': 'task',
+                    # 'try_number': '1'
+                },
             },
             'spec': {
                 'affinity': {},
@@ -91,6 +92,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
                     'name': 'base',
                     'ports': [],
                     'volumeMounts': [],
+                    'resources': {'limits': {'cpu': None,
+                                             'memory': None,
+                                             'nvidia.com/gpu': None},
+                                  'requests': {'cpu': None, 'memory': None}},
                 }],
                 'hostNetwork': False,
                 'imagePullSecrets': [],
@@ -229,26 +234,6 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
         self.assertEqual(self.expected_pod['metadata']['labels'], 
actual_pod['metadata']['labels'])
 
-    def test_pod_schedulername(self):
-        scheduler_name = "default-scheduler"
-        k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="test",
-            task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            schedulername=scheduler_name
-        )
-        context = create_context(k)
-        k.execute(context)
-        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
-        self.expected_pod['spec']['schedulerName'] = scheduler_name
-        self.assertEqual(self.expected_pod, actual_pod)
-
     def test_pod_node_selectors(self):
         node_selectors = {
             'beta.kubernetes.io/os': 'linux'
@@ -275,10 +260,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         resources = {
             'limit_cpu': 0.25,
             'limit_memory': '64Mi',
-            'limit_ephemeral_storage': '2Gi',
             'request_cpu': '250m',
             'request_memory': '64Mi',
-            'request_ephemeral_storage': '1Gi',
         }
         k = KubernetesPodOperator(
             namespace='default',
@@ -299,13 +282,11 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             'requests': {
                 'memory': '64Mi',
                 'cpu': '250m',
-                'ephemeral-storage': '1Gi'
             },
             'limits': {
                 'memory': '64Mi',
                 'cpu': 0.25,
                 'nvidia.com/gpu': None,
-                'ephemeral-storage': '2Gi'
             }
         }
         self.assertEqual(self.expected_pod, actual_pod)
@@ -583,10 +564,9 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         self.expected_pod['spec']['containers'].append(container)
         self.assertEqual(self.expected_pod, actual_pod)
 
-    @patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
-    @patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
     @patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_envs_from_configmaps(self, mock_client, mock_monitor, mock_start):
+    def test_envs_from_configmaps(self, mock_client, mock_run):
         # GIVEN
         from airflow.utils.state import State
 
@@ -605,20 +585,19 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             configmaps=[configmap],
         )
         # THEN
-        mock_monitor.return_value = (State.SUCCESS, None)
+        mock_run.return_value = (State.SUCCESS, None)
         context = create_context(k)
         k.execute(context)
         self.assertEqual(
-            mock_start.call_args[0][0].spec.containers[0].env_from,
+            mock_run.call_args[0][0].spec.containers[0].env_from,
             [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(
                 name=configmap
             ))]
         )
 
-    @patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
-    @patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
     @patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_envs_from_secrets(self, mock_client, monitor_mock, start_mock):
+    def test_envs_from_secrets(self, mock_client, mock_run):
         # GIVEN
         from airflow.utils.state import State
         secret_ref = 'secret_name'
@@ -637,11 +616,11 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             do_xcom_push=False,
         )
         # THEN
-        monitor_mock.return_value = (State.SUCCESS, None)
+        mock_run.return_value = (State.SUCCESS, None)
         context = create_context(k)
         k.execute(context)
         self.assertEqual(
-            start_mock.call_args[0][0].spec.containers[0].env_from,
+            mock_run.call_args[0][0].spec.containers[0].env_from,
             [k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(
                 name=secret_ref
             ))]
@@ -725,66 +704,12 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         }]
         self.assertEqual(self.expected_pod, actual_pod)
 
-    @patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
-    @patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
-    @patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_pod_template_file(
-            self,
-            mock_client,
-            monitor_mock,
-            start_mock):  # pylint: disable=unused-argument
-        from airflow.utils.state import State
-        k = KubernetesPodOperator(
-            task_id='task',
-            pod_template_file='tests/kubernetes/pod.yaml',
-            do_xcom_push=True
-        )
-        monitor_mock.return_value = (State.SUCCESS, None)
-        context = create_context(k)
-        k.execute(context)
-        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
-        self.assertEqual({
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {'name': mock.ANY, 'namespace': 'mem-example'},
-            'spec': {
-                'volumes': [{'name': 'xcom', 'emptyDir': {}}],
-                'containers': [{
-                    'args': ['--vm', '1', '--vm-bytes', '150M', '--vm-hang', 
'1'],
-                    'command': ['stress'],
-                    'image': 'polinux/stress',
-                    'name': 'memory-demo-ctr',
-                    'resources': {
-                        'limits': {'memory': '200Mi'},
-                        'requests': {'memory': '100Mi'}
-                    },
-                    'volumeMounts': [{
-                        'name': 'xcom',
-                        'mountPath': '/airflow/xcom'
-                    }]
-                }, {
-                    'name': 'airflow-xcom-sidecar',
-                    'image': "alpine",
-                    'command': ['sh', '-c', PodDefaults.XCOM_CMD],
-                    'volumeMounts': [
-                        {
-                            'name': 'xcom',
-                            'mountPath': '/airflow/xcom'
-                        }
-                    ],
-                    'resources': {'requests': {'cpu': '1m'}},
-                }],
-            }
-        }, actual_pod)
-
-    @patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
-    @patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
     @patch("airflow.kubernetes.kube_client.get_kube_client")
     def test_pod_priority_class_name(
             self,
             mock_client,
-            monitor_mock,
-            start_mock):  # pylint: disable=unused-argument
+            run_mock):  # pylint: disable=unused-argument
         """Test ability to assign priorityClassName to pod
 
         """
@@ -804,27 +729,9 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             priority_class_name=priority_class_name,
         )
 
-        monitor_mock.return_value = (State.SUCCESS, None)
+        run_mock.return_value = (State.SUCCESS, None)
         context = create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['priorityClassName'] = priority_class_name
         self.assertEqual(self.expected_pod, actual_pod)
-
-    def test_pod_name(self):
-        pod_name_too_long = "a" * 221
-        with self.assertRaises(AirflowException):
-            KubernetesPodOperator(
-                namespace='default',
-                image="ubuntu:16.04",
-                cmds=["bash", "-cx"],
-                arguments=["echo 10"],
-                labels={"foo": "bar"},
-                name=pod_name_too_long,
-                task_id="task",
-                in_cluster=False,
-                do_xcom_push=False,
-            )
-
-
-# pylint: enable=unused-argument
diff --git a/scripts/ci/kubernetes/app/templates/airflow.template.yaml 
b/scripts/ci/kubernetes/app/templates/airflow.template.yaml
index 3f2465d..f3704d6 100644
--- a/scripts/ci/kubernetes/app/templates/airflow.template.yaml
+++ b/scripts/ci/kubernetes/app/templates/airflow.template.yaml
@@ -162,3 +162,46 @@ spec:
       nodePort: 30809
   selector:
     name: airflow
+---
+apiVersion: v1
+kind: Pod
+metadata:
+  name: init-dags
+  namespace: test-namespace
+spec:
+  containers:
+    - name: "init-dags-test-namespace"
+      image: {{AIRFLOW_KUBERNETES_IMAGE}}
+      imagePullPolicy: Never
+      securityContext:
+        runAsUser: 0
+      volumeMounts:
+        - name: airflow-configmap
+          mountPath: /opt/airflow/airflow.cfg
+          subPath: airflow.cfg
+        - name: {{INIT_DAGS_VOLUME_NAME}}
+          mountPath: /opt/airflow/dags
+      env:
+        - name: SQL_ALCHEMY_CONN
+          valueFrom:
+            secretKeyRef:
+              name: airflow-secrets
+              key: sql_alchemy_conn
+      command:
+        - "bash"
+      args:
+        - "-cx"
+        - "/tmp/airflow-test-env-init-dags.sh"
+  volumes:
+  - name: airflow-dags
+    persistentVolumeClaim:
+      claimName: airflow-dags
+  - name: airflow-dags-fake
+    emptyDir: {}
+  - name: airflow-dags-git
+    emptyDir: {}
+  - name: airflow-logs
+    emptyDir: {}
+  - name: airflow-configmap
+    configMap:
+      name: airflow-configmap
diff --git a/scripts/ci/libraries/_kind.sh b/scripts/ci/libraries/_kind.sh
index b89ea90..1d8b4ad 100644
--- a/scripts/ci/libraries/_kind.sh
+++ b/scripts/ci/libraries/_kind.sh
@@ -368,8 +368,13 @@ function apply_kubernetes_resources() {
 
     kubectl apply -f "${KUBERNETES_APP_DIR}/secrets.yaml" --cluster 
"${KUBECTL_CLUSTER_NAME}"
     kubectl apply -f "${BUILD_DIRNAME}/configmaps.yaml" --cluster 
"${KUBECTL_CLUSTER_NAME}"
-    kubectl apply -f "${KUBERNETES_APP_DIR}/postgres.yaml" --cluster 
"${KUBECTL_CLUSTER_NAME}"
     kubectl apply -f "${KUBERNETES_APP_DIR}/volumes.yaml" --cluster 
"${KUBECTL_CLUSTER_NAME}"
+
+    kubectl apply -f "${KUBERNETES_APP_DIR}/secrets.yaml" --cluster 
"${KUBECTL_CLUSTER_NAME}" -n "test-namespace"
+    kubectl apply -f "${BUILD_DIRNAME}/configmaps.yaml" --cluster 
"${KUBECTL_CLUSTER_NAME}" -n "test-namespace"
+    kubectl apply -f "${KUBERNETES_APP_DIR}/volumes.yaml" --cluster 
"${KUBECTL_CLUSTER_NAME}" -n "test-namespace"
+
+    kubectl apply -f "${KUBERNETES_APP_DIR}/postgres.yaml" --cluster 
"${KUBECTL_CLUSTER_NAME}"
     kubectl apply -f "${BUILD_DIRNAME}/airflow.yaml" --cluster 
"${KUBECTL_CLUSTER_NAME}"
 }
 

Reply via email to