This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new d452544 Allow overrides for pod_template_file (#11162)
d452544 is described below
commit d4525440f101be82740110e492581404a8f9ac5c
Author: Daniel Imberman <[email protected]>
AuthorDate: Sun Sep 27 14:39:35 2020 -0700
Allow overrides for pod_template_file (#11162)
* Allow overrides for pod_template_file
A pod_template_file should be treated as a *template* not a steadfast
rule.
This PR ensures that users can override individual values set by the
pod_template_file s.t. the same file can be used for multiple tasks.
* fix podtemplatetest
* fix name
(cherry picked from commit a888198c27bcdbc4538c02360c308ffcaca182fa)
---
.../contrib/operators/kubernetes_pod_operator.py | 94 ++++++++++--------
kubernetes_tests/test_kubernetes_pod_operator.py | 110 ++++++++++++++-------
2 files changed, 130 insertions(+), 74 deletions(-)
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py
b/airflow/contrib/operators/kubernetes_pod_operator.py
index cdf5076..d9570d9 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -28,6 +28,7 @@ from airflow.utils.decorators import apply_defaults
from airflow.utils.helpers import validate_key
from airflow.utils.state import State
from airflow.version import version as airflow_version
+from airflow.kubernetes.pod_generator import PodGenerator
class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-attributes
@@ -272,6 +273,9 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
client =
kube_client.get_kube_client(cluster_context=self.cluster_context,
config_file=self.config_file)
+ self.pod = self.create_pod_request_obj()
+ self.namespace = self.pod.metadata.namespace
+
self.client = client
# Add combination of labels to uniquely identify a running pod
@@ -356,45 +360,57 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
Creates a V1Pod based on user parameters. Note that a `pod` or
`pod_template_file`
will supersede all other values.
"""
- pod = pod_generator.PodGenerator(
- image=self.image,
- namespace=self.namespace,
- cmds=self.cmds,
- args=self.arguments,
- labels=self.labels,
- name=self.name,
- envs=self.env_vars,
- extract_xcom=self.do_xcom_push,
- image_pull_policy=self.image_pull_policy,
- node_selectors=self.node_selectors,
- annotations=self.annotations,
- affinity=self.affinity,
- image_pull_secrets=self.image_pull_secrets,
- service_account_name=self.service_account_name,
- hostnetwork=self.hostnetwork,
- tolerations=self.tolerations,
- configmaps=self.configmaps,
- security_context=self.security_context,
- dnspolicy=self.dnspolicy,
- init_containers=self.init_containers,
- restart_policy='Never',
- schedulername=self.schedulername,
- pod_template_file=self.pod_template_file,
- priority_class_name=self.priority_class_name,
- pod=self.full_pod_spec,
- ).gen_pod()
-
- # noinspection PyTypeChecker
- pod = append_to_pod(
- pod,
- self.pod_runtime_info_envs + # type: ignore
- self.ports + # type: ignore
- self.resources + # type: ignore
- self.secrets + # type: ignore
- self.volumes + # type: ignore
- self.volume_mounts # type: ignore
+ if self.pod_template_file:
+ pod_template =
pod_generator.PodGenerator.deserialize_model_file(self.pod_template_file)
+ else:
+ pod_template = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="name"))
+
+ pod = k8s.V1Pod(
+ api_version="v1",
+ kind="Pod",
+ metadata=k8s.V1ObjectMeta(
+ namespace=self.namespace,
+ labels=self.labels,
+ name=self.name,
+ annotations=self.annotations,
+
+ ),
+ spec=k8s.V1PodSpec(
+ node_selector=self.node_selectors,
+ affinity=self.affinity,
+ tolerations=self.tolerations,
+ init_containers=self.init_containers,
+ containers=[
+ k8s.V1Container(
+ image=self.image,
+ name="base",
+ command=self.cmds,
+ ports=self.ports,
+ resources=self.k8s_resources,
+ volume_mounts=self.volume_mounts,
+ args=self.arguments,
+ env=self.env_vars,
+ env_from=self.env_from,
+ )
+ ],
+ image_pull_secrets=self.image_pull_secrets,
+ service_account_name=self.service_account_name,
+ host_network=self.hostnetwork,
+ security_context=self.security_context,
+ dns_policy=self.dnspolicy,
+ scheduler_name=self.schedulername,
+ restart_policy='Never',
+ priority_class_name=self.priority_class_name,
+ volumes=self.volumes,
+ )
)
+ pod = PodGenerator.reconcile_pods(pod_template, pod)
+
+ for secret in self.secrets:
+ pod = secret.attach_to_pod(pod)
+ if self.do_xcom_push:
+ pod = PodGenerator.add_xcom_sidecar(pod)
return pod
def create_new_pod_for_operator(self, labels, launcher):
@@ -435,9 +451,9 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
def monitor_launched_pod(self, launcher, pod):
"""
- Montitors a pod to completion that was created by a previous
KubernetesPodOperator
+ Monitors a pod to completion that was created by a previous
KubernetesPodOperator
- @param launcher: pod launcher that will manage launching and
monitoring pods
+ :param launcher: pod launcher that will manage launching and
monitoring pods
:param pod: podspec used to find pod using k8s API
:return:
"""
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 0335b58..d0e5d5f 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -834,6 +834,24 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
self.assertIsNotNone(result)
self.assertDictEqual(result, {"hello": "world"})
+ def test_pod_template_file_with_overrides_system(self):
+ fixture = sys.path[0] + '/tests/kubernetes/basic_pod.yaml'
+ k = KubernetesPodOperator(
+ task_id="task" + self.get_current_task_name(),
+ labels={"foo": "bar", "fizz": "buzz"},
+ env_vars=[k8s.V1EnvVar(name="env_name", value="value")],
+ in_cluster=False,
+ pod_template_file=fixture,
+ do_xcom_push=True
+ )
+
+ context = create_context(k)
+ result = k.execute(context)
+ self.assertIsNotNone(result)
+ self.assertEqual(k.pod.metadata.labels, {'fizz': 'buzz', 'foo': 'bar'})
+ self.assertEqual(k.pod.spec.containers[0].env,
[k8s.V1EnvVar(name="env_name", value="value")])
+ self.assertDictEqual(result, {"hello": "world"})
+
def test_init_container(self):
# GIVEN
volume_mounts = [k8s.V1VolumeMount(
@@ -923,42 +941,64 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
do_xcom_push=True
)
monitor_mock.return_value = (State.SUCCESS, None)
- context = self.create_context(k)
- k.execute(context)
+ context = create_context(k)
+ with self.assertLogs(k.log, level=logging.DEBUG) as cm:
+ k.execute(context)
+ expected_line = textwrap.dedent("""\
+ DEBUG:airflow.task.operators:Starting pod:
+ api_version: v1
+ kind: Pod
+ metadata:
+ annotations: {}
+ cluster_name: null
+ creation_timestamp: null
+ deletion_grace_period_seconds: null\
+ """).strip()
+ self.assertTrue(any(line.startswith(expected_line) for line in
cm.output))
+
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
- self.assertEqual({
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': {'name': mock.ANY, 'namespace': 'mem-example'},
- 'spec': {
- 'volumes': [{'name': 'xcom', 'emptyDir': {}}],
- 'containers': [{
- 'args': ['--vm', '1', '--vm-bytes', '150M', '--vm-hang',
'1'],
- 'command': ['stress'],
- 'image': 'apache/airflow:stress-2020.07.10-1.0.4',
- 'name': 'memory-demo-ctr',
- 'resources': {
- 'limits': {'memory': '200Mi'},
- 'requests': {'memory': '100Mi'}
- },
- 'volumeMounts': [{
- 'name': 'xcom',
- 'mountPath': '/airflow/xcom'
- }]
- }, {
- 'name': 'airflow-xcom-sidecar',
- 'image': "alpine",
- 'command': ['sh', '-c', PodDefaults.XCOM_CMD],
- 'volumeMounts': [
- {
- 'name': 'xcom',
- 'mountPath': '/airflow/xcom'
- }
- ],
- 'resources': {'requests': {'cpu': '1m'}},
- }],
- }
- }, actual_pod)
+ expected_dict = {'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {'annotations': {},
+ 'labels': {},
+ 'name': 'memory-demo',
+ 'namespace': 'mem-example'},
+ 'spec': {'affinity': {},
+ 'containers': [{'args': ['--vm',
+ '1',
+ '--vm-bytes',
+ '150M',
+ '--vm-hang',
+ '1'],
+ 'command': ['stress'],
+ 'env': [],
+ 'envFrom': [],
+ 'image':
'apache/airflow:stress-2020.07.10-1.0.4',
+ 'name': 'base',
+ 'ports': [],
+ 'resources': {'limits':
{'memory': '200Mi'},
+ 'requests':
{'memory': '100Mi'}},
+ 'volumeMounts':
[{'mountPath': '/airflow/xcom',
+ 'name':
'xcom'}]},
+ {'command': ['sh',
+ '-c',
+ 'trap "exit 0"
INT; while true; do sleep '
+ '30; done;'],
+ 'image': 'alpine',
+ 'name':
'airflow-xcom-sidecar',
+ 'resources': {'requests':
{'cpu': '1m'}},
+ 'volumeMounts':
[{'mountPath': '/airflow/xcom',
+ 'name':
'xcom'}]}],
+ 'hostNetwork': False,
+ 'imagePullSecrets': [],
+ 'initContainers': [],
+ 'nodeSelector': {},
+ 'restartPolicy': 'Never',
+ 'securityContext': {},
+ 'serviceAccountName': 'default',
+ 'tolerations': [],
+ 'volumes': [{'emptyDir': {}, 'name':
'xcom'}]}}
+ self.assertEqual(expected_dict, actual_pod)
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
@mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")