This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new ff827f7 Make Kubernetes tests pass (#9332)
ff827f7 is described below
commit ff827f78432b40cfb097e1ffc31c4ca9649888df
Author: Daniel Imberman <[email protected]>
AuthorDate: Wed Jun 17 08:44:34 2020 -0700
Make Kubernetes tests pass (#9332)
* Fixes k8s tests on 1-10-test
* add pyfile dependency
* add pyfile dependency
Co-authored-by: Daniel Imberman <[email protected]>
---
.github/workflows/ci.yml | 2 +-
.../contrib/operators/kubernetes_pod_operator.py | 6 +
airflow/kubernetes/pod_generator.py | 3 +-
kubernetes_tests/test_kubernetes_pod_operator.py | 133 ++++-----------------
.../kubernetes/app/templates/airflow.template.yaml | 43 +++++++
scripts/ci/libraries/_kind.sh | 7 +-
6 files changed, 78 insertions(+), 116 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index de37788..f2c96d1 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -106,7 +106,7 @@ jobs:
timeout-minutes: 80
name: "K8s: ${{matrix.kube-mode}} ${{matrix.python-version}}
${{matrix.kubernetes-version}}"
runs-on: ubuntu-latest
- needs: [static-checks]
+ needs: [static-checks, pyfiles]
strategy:
matrix:
python-version: [3.6, 3.7]
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py
b/airflow/contrib/operators/kubernetes_pod_operator.py
index d1fba07..bfe128d 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -150,8 +150,10 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
extract_xcom=self.do_xcom_push,
image_pull_policy=self.image_pull_policy,
node_selectors=self.node_selectors,
+ priority_class_name=self.priority_class_name,
annotations=self.annotations,
affinity=self.affinity,
+ init_containers=self.init_containers,
image_pull_secrets=self.image_pull_secrets,
service_account_name=self.service_account_name,
hostnetwork=self.hostnetwork,
@@ -223,6 +225,7 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
annotations=None,
resources=None,
affinity=None,
+ init_containers=None,
config_file=None,
do_xcom_push=False,
node_selectors=None,
@@ -236,6 +239,7 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
pod_runtime_info_envs=None,
dnspolicy=None,
full_pod_spec=None,
+ priority_class_name=None,
*args,
**kwargs):
#
https://github.com/apache/airflow/blob/2d0eff4ee4fafcf8c7978ac287a8fb968e56605f/UPDATING.md#unification-of-do_xcom_push-flag
@@ -257,6 +261,8 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
self.name = name
self.env_vars = env_vars or {}
self.ports = ports or []
+ self.init_containers = init_containers or []
+ self.priority_class_name = priority_class_name
self.volume_mounts = volume_mounts or []
self.volumes = volumes or []
self.secrets = secrets or []
diff --git a/airflow/kubernetes/pod_generator.py
b/airflow/kubernetes/pod_generator.py
index 5b86161..2a5a0df 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -118,6 +118,7 @@ class PodGenerator:
security_context=None,
configmaps=None,
dnspolicy=None,
+ priority_class_name=None,
pod=None,
extract_xcom=False,
):
@@ -176,7 +177,7 @@ class PodGenerator:
self.spec.volumes = volumes or []
self.spec.node_selector = node_selectors
self.spec.restart_policy = restart_policy
-
+ self.spec.priority_class_name = priority_class_name
self.spec.image_pull_secrets = []
if image_pull_secrets:
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 7eb4beb..360add1 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -74,10 +74,11 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
'labels': {
'foo': 'bar', 'kubernetes_pod_operator': 'True',
'airflow_version': airflow_version.replace('+', '-'),
- 'execution_date': '2016-01-01T0100000100-a2f50a31f',
- 'dag_id': 'dag',
- 'task_id': 'task',
- 'try_number': '1'},
+ # 'execution_date': '2016-01-01T0100000100-a2f50a31f',
+ # 'dag_id': 'dag',
+ # 'task_id': 'task',
+ # 'try_number': '1'
+ },
},
'spec': {
'affinity': {},
@@ -91,6 +92,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
'name': 'base',
'ports': [],
'volumeMounts': [],
+ 'resources': {'limits': {'cpu': None,
+ 'memory': None,
+ 'nvidia.com/gpu': None},
+ 'requests': {'cpu': None, 'memory': None}},
}],
'hostNetwork': False,
'imagePullSecrets': [],
@@ -229,26 +234,6 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
self.assertEqual(self.expected_pod['metadata']['labels'],
actual_pod['metadata']['labels'])
- def test_pod_schedulername(self):
- scheduler_name = "default-scheduler"
- k = KubernetesPodOperator(
- namespace="default",
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=["echo 10"],
- labels={"foo": "bar"},
- name="test",
- task_id="task",
- in_cluster=False,
- do_xcom_push=False,
- schedulername=scheduler_name
- )
- context = create_context(k)
- k.execute(context)
- actual_pod = self.api_client.sanitize_for_serialization(k.pod)
- self.expected_pod['spec']['schedulerName'] = scheduler_name
- self.assertEqual(self.expected_pod, actual_pod)
-
def test_pod_node_selectors(self):
node_selectors = {
'beta.kubernetes.io/os': 'linux'
@@ -275,10 +260,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
resources = {
'limit_cpu': 0.25,
'limit_memory': '64Mi',
- 'limit_ephemeral_storage': '2Gi',
'request_cpu': '250m',
'request_memory': '64Mi',
- 'request_ephemeral_storage': '1Gi',
}
k = KubernetesPodOperator(
namespace='default',
@@ -299,13 +282,11 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
'requests': {
'memory': '64Mi',
'cpu': '250m',
- 'ephemeral-storage': '1Gi'
},
'limits': {
'memory': '64Mi',
'cpu': 0.25,
'nvidia.com/gpu': None,
- 'ephemeral-storage': '2Gi'
}
}
self.assertEqual(self.expected_pod, actual_pod)
@@ -583,10 +564,9 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
self.expected_pod['spec']['containers'].append(container)
self.assertEqual(self.expected_pod, actual_pod)
- @patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
- @patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+ @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
@patch("airflow.kubernetes.kube_client.get_kube_client")
- def test_envs_from_configmaps(self, mock_client, mock_monitor, mock_start):
+ def test_envs_from_configmaps(self, mock_client, mock_run):
# GIVEN
from airflow.utils.state import State
@@ -605,20 +585,19 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
configmaps=[configmap],
)
# THEN
- mock_monitor.return_value = (State.SUCCESS, None)
+ mock_run.return_value = (State.SUCCESS, None)
context = create_context(k)
k.execute(context)
self.assertEqual(
- mock_start.call_args[0][0].spec.containers[0].env_from,
+ mock_run.call_args[0][0].spec.containers[0].env_from,
[k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(
name=configmap
))]
)
- @patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
- @patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+ @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
@patch("airflow.kubernetes.kube_client.get_kube_client")
- def test_envs_from_secrets(self, mock_client, monitor_mock, start_mock):
+ def test_envs_from_secrets(self, mock_client, mock_run):
# GIVEN
from airflow.utils.state import State
secret_ref = 'secret_name'
@@ -637,11 +616,11 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
do_xcom_push=False,
)
# THEN
- monitor_mock.return_value = (State.SUCCESS, None)
+ mock_run.return_value = (State.SUCCESS, None)
context = create_context(k)
k.execute(context)
self.assertEqual(
- start_mock.call_args[0][0].spec.containers[0].env_from,
+ mock_run.call_args[0][0].spec.containers[0].env_from,
[k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(
name=secret_ref
))]
@@ -725,66 +704,12 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
}]
self.assertEqual(self.expected_pod, actual_pod)
- @patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
- @patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
- @patch("airflow.kubernetes.kube_client.get_kube_client")
- def test_pod_template_file(
- self,
- mock_client,
- monitor_mock,
- start_mock): # pylint: disable=unused-argument
- from airflow.utils.state import State
- k = KubernetesPodOperator(
- task_id='task',
- pod_template_file='tests/kubernetes/pod.yaml',
- do_xcom_push=True
- )
- monitor_mock.return_value = (State.SUCCESS, None)
- context = create_context(k)
- k.execute(context)
- actual_pod = self.api_client.sanitize_for_serialization(k.pod)
- self.assertEqual({
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': {'name': mock.ANY, 'namespace': 'mem-example'},
- 'spec': {
- 'volumes': [{'name': 'xcom', 'emptyDir': {}}],
- 'containers': [{
- 'args': ['--vm', '1', '--vm-bytes', '150M', '--vm-hang',
'1'],
- 'command': ['stress'],
- 'image': 'polinux/stress',
- 'name': 'memory-demo-ctr',
- 'resources': {
- 'limits': {'memory': '200Mi'},
- 'requests': {'memory': '100Mi'}
- },
- 'volumeMounts': [{
- 'name': 'xcom',
- 'mountPath': '/airflow/xcom'
- }]
- }, {
- 'name': 'airflow-xcom-sidecar',
- 'image': "alpine",
- 'command': ['sh', '-c', PodDefaults.XCOM_CMD],
- 'volumeMounts': [
- {
- 'name': 'xcom',
- 'mountPath': '/airflow/xcom'
- }
- ],
- 'resources': {'requests': {'cpu': '1m'}},
- }],
- }
- }, actual_pod)
-
- @patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
- @patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+ @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
@patch("airflow.kubernetes.kube_client.get_kube_client")
def test_pod_priority_class_name(
self,
mock_client,
- monitor_mock,
- start_mock): # pylint: disable=unused-argument
+ run_mock): # pylint: disable=unused-argument
"""Test ability to assign priorityClassName to pod
"""
@@ -804,27 +729,9 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
priority_class_name=priority_class_name,
)
- monitor_mock.return_value = (State.SUCCESS, None)
+ run_mock.return_value = (State.SUCCESS, None)
context = create_context(k)
k.execute(context)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['priorityClassName'] = priority_class_name
self.assertEqual(self.expected_pod, actual_pod)
-
- def test_pod_name(self):
- pod_name_too_long = "a" * 221
- with self.assertRaises(AirflowException):
- KubernetesPodOperator(
- namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=["echo 10"],
- labels={"foo": "bar"},
- name=pod_name_too_long,
- task_id="task",
- in_cluster=False,
- do_xcom_push=False,
- )
-
-
-# pylint: enable=unused-argument
diff --git a/scripts/ci/kubernetes/app/templates/airflow.template.yaml
b/scripts/ci/kubernetes/app/templates/airflow.template.yaml
index 3f2465d..f3704d6 100644
--- a/scripts/ci/kubernetes/app/templates/airflow.template.yaml
+++ b/scripts/ci/kubernetes/app/templates/airflow.template.yaml
@@ -162,3 +162,46 @@ spec:
nodePort: 30809
selector:
name: airflow
+---
+apiVersion: v1
+kind: Pod
+metadata:
+ name: init-dags
+ namespace: test-namespace
+spec:
+ containers:
+ - name: "init-dags-test-namespace"
+ image: {{AIRFLOW_KUBERNETES_IMAGE}}
+ imagePullPolicy: Never
+ securityContext:
+ runAsUser: 0
+ volumeMounts:
+ - name: airflow-configmap
+ mountPath: /opt/airflow/airflow.cfg
+ subPath: airflow.cfg
+ - name: {{INIT_DAGS_VOLUME_NAME}}
+ mountPath: /opt/airflow/dags
+ env:
+ - name: SQL_ALCHEMY_CONN
+ valueFrom:
+ secretKeyRef:
+ name: airflow-secrets
+ key: sql_alchemy_conn
+ command:
+ - "bash"
+ args:
+ - "-cx"
+ - "/tmp/airflow-test-env-init-dags.sh"
+ volumes:
+ - name: airflow-dags
+ persistentVolumeClaim:
+ claimName: airflow-dags
+ - name: airflow-dags-fake
+ emptyDir: {}
+ - name: airflow-dags-git
+ emptyDir: {}
+ - name: airflow-logs
+ emptyDir: {}
+ - name: airflow-configmap
+ configMap:
+ name: airflow-configmap
diff --git a/scripts/ci/libraries/_kind.sh b/scripts/ci/libraries/_kind.sh
index b89ea90..1d8b4ad 100644
--- a/scripts/ci/libraries/_kind.sh
+++ b/scripts/ci/libraries/_kind.sh
@@ -368,8 +368,13 @@ function apply_kubernetes_resources() {
kubectl apply -f "${KUBERNETES_APP_DIR}/secrets.yaml" --cluster
"${KUBECTL_CLUSTER_NAME}"
kubectl apply -f "${BUILD_DIRNAME}/configmaps.yaml" --cluster
"${KUBECTL_CLUSTER_NAME}"
- kubectl apply -f "${KUBERNETES_APP_DIR}/postgres.yaml" --cluster
"${KUBECTL_CLUSTER_NAME}"
kubectl apply -f "${KUBERNETES_APP_DIR}/volumes.yaml" --cluster
"${KUBECTL_CLUSTER_NAME}"
+
+ kubectl apply -f "${KUBERNETES_APP_DIR}/secrets.yaml" --cluster
"${KUBECTL_CLUSTER_NAME}" -n "test-namespace"
+ kubectl apply -f "${BUILD_DIRNAME}/configmaps.yaml" --cluster
"${KUBECTL_CLUSTER_NAME}" -n "test-namespace"
+ kubectl apply -f "${KUBERNETES_APP_DIR}/volumes.yaml" --cluster
"${KUBECTL_CLUSTER_NAME}" -n "test-namespace"
+
+ kubectl apply -f "${KUBERNETES_APP_DIR}/postgres.yaml" --cluster
"${KUBECTL_CLUSTER_NAME}"
kubectl apply -f "${BUILD_DIRNAME}/airflow.yaml" --cluster
"${KUBECTL_CLUSTER_NAME}"
}