XD-DENG commented on a change in pull request #12494:
URL: https://github.com/apache/airflow/pull/12494#discussion_r527194194
##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -31,130 +30,146 @@
'owner': 'airflow',
}
-with DAG(
- dag_id='example_kubernetes_executor_config',
- default_args=default_args,
- schedule_interval=None,
- start_date=days_ago(2),
- tags=['example3'],
-) as dag:
+log = logging.getLogger(__name__)
+
+try:
+ from kubernetes.client import models as k8s
+
+ with DAG(
+ dag_id='example_kubernetes_executor_config',
+ default_args=default_args,
+ schedule_interval=None,
+ start_date=days_ago(2),
+ tags=['example3'],
+ ) as dag:
- def test_sharedvolume_mount():
- """
- Tests whether the volume has been mounted.
- """
- for i in range(5):
- try:
- return_code = os.system("cat /shared/test.txt")
- if return_code != 0:
- raise ValueError(f"Error when checking volume mount.
Return code {return_code}")
- except ValueError as e:
- if i > 4:
- raise e
+ def test_sharedvolume_mount():
+ """
+ Tests whether the volume has been mounted.
+ """
+ for i in range(5):
+ try:
+ return_code = os.system("cat /shared/test.txt")
+ if return_code != 0:
+ raise ValueError(f"Error when checking volume mount.
Return code {return_code}")
+ except ValueError as e:
+ if i > 4:
+ raise e
- def test_volume_mount():
- """
- Tests whether the volume has been mounted.
- """
- with open('/foo/volume_mount_test.txt', 'w') as foo:
- foo.write('Hello')
+ def test_volume_mount():
+ """
+ Tests whether the volume has been mounted.
+ """
+ with open('/foo/volume_mount_test.txt', 'w') as foo:
+ foo.write('Hello')
- return_code = os.system("cat /foo/volume_mount_test.txt")
- if return_code != 0:
- raise ValueError(f"Error when checking volume mount. Return code
{return_code}")
+ return_code = os.system("cat /foo/volume_mount_test.txt")
+ if return_code != 0:
+ raise ValueError(f"Error when checking volume mount. Return
code {return_code}")
- # You can use annotations on your kubernetes pods!
- start_task = PythonOperator(
- task_id="start_task",
- python_callable=print_stuff,
- executor_config={
- "pod_override":
k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
- },
- )
+ # You can use annotations on your kubernetes pods!
+ start_task = PythonOperator(
+ task_id="start_task",
+ python_callable=print_stuff,
+ executor_config={
+ "pod_override":
k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+ },
+ )
- # [START task_with_volume]
- volume_task = PythonOperator(
- task_id="task_with_volume",
- python_callable=test_volume_mount,
- executor_config={
- "pod_override": k8s.V1Pod(
- spec=k8s.V1PodSpec(
- containers=[
- k8s.V1Container(
- name="base",
- volume_mounts=[
- k8s.V1VolumeMount(mount_path="/foo/",
name="example-kubernetes-test-volume")
- ],
- )
- ],
- volumes=[
- k8s.V1Volume(
- name="example-kubernetes-test-volume",
- host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
- )
- ],
- )
- ),
- },
- )
- # [END task_with_volume]
+ # [START task_with_volume]
+ volume_task = PythonOperator(
+ task_id="task_with_volume",
+ python_callable=test_volume_mount,
+ executor_config={
+ "pod_override": k8s.V1Pod(
+ spec=k8s.V1PodSpec(
+ containers=[
+ k8s.V1Container(
+ name="base",
+ volume_mounts=[
+ k8s.V1VolumeMount(
+ mount_path="/foo/",
name="example-kubernetes-test-volume"
+ )
+ ],
+ )
+ ],
+ volumes=[
+ k8s.V1Volume(
+ name="example-kubernetes-test-volume",
+
host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+ )
+ ],
+ )
+ ),
+ },
+ )
+ # [END task_with_volume]
- # [START task_with_template]
- task_with_template = PythonOperator(
- task_id="task_with_template",
- python_callable=print_stuff,
- executor_config={
- "pod_template_file":
"/usr/local/airflow/pod_templates/basic_template.yaml",
- "pod_override":
k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
- },
- )
- # [END task_with_template]
+ # [START task_with_template]
+ task_with_template = PythonOperator(
+ task_id="task_with_template",
+ python_callable=print_stuff,
+ executor_config={
+ "pod_template_file":
"/usr/local/airflow/pod_templates/basic_template.yaml",
+ "pod_override":
k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
+ },
+ )
+ # [END task_with_template]
- # [START task_with_sidecar]
- sidecar_task = PythonOperator(
- task_id="task_with_sidecar",
- python_callable=test_sharedvolume_mount,
- executor_config={
- "pod_override": k8s.V1Pod(
- spec=k8s.V1PodSpec(
- containers=[
- k8s.V1Container(
- name="base",
-
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/",
name="shared-empty-dir")],
- ),
- k8s.V1Container(
- name="sidecar",
- image="ubuntu",
- args=["echo \"retrieved from mount\" >
/shared/test.txt"],
- command=["bash", "-cx"],
-
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/",
name="shared-empty-dir")],
- ),
- ],
- volumes=[
- k8s.V1Volume(name="shared-empty-dir",
empty_dir=k8s.V1EmptyDirVolumeSource()),
- ],
- )
- ),
- },
- )
- # [END task_with_sidecar]
+ # [START task_with_sidecar]
+ sidecar_task = PythonOperator(
+ task_id="task_with_sidecar",
+ python_callable=test_sharedvolume_mount,
+ executor_config={
+ "pod_override": k8s.V1Pod(
+ spec=k8s.V1PodSpec(
+ containers=[
+ k8s.V1Container(
+ name="base",
+ volume_mounts=[
+ k8s.V1VolumeMount(mount_path="/shared/",
name="shared-empty-dir")
+ ],
+ ),
+ k8s.V1Container(
+ name="sidecar",
+ image="ubuntu",
+ args=["echo \"retrieved from mount\" >
/shared/test.txt"],
+ command=["bash", "-cx"],
+ volume_mounts=[
+ k8s.V1VolumeMount(mount_path="/shared/",
name="shared-empty-dir")
+ ],
+ ),
+ ],
+ volumes=[
+ k8s.V1Volume(name="shared-empty-dir",
empty_dir=k8s.V1EmptyDirVolumeSource()),
+ ],
+ )
+ ),
+ },
+ )
+ # [END task_with_sidecar]
- # Test that we can add labels to pods
- third_task = PythonOperator(
- task_id="non_root_task",
- python_callable=print_stuff,
- executor_config={"pod_override":
k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))},
- )
+ # Test that we can add labels to pods
+ third_task = PythonOperator(
+ task_id="non_root_task",
+ python_callable=print_stuff,
+ executor_config={
+ "pod_override":
k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
+ },
+ )
- other_ns_task = PythonOperator(
- task_id="other_namespace_task",
- python_callable=print_stuff,
- executor_config={
- "KubernetesExecutor": {"namespace": "test-namespace", "labels":
{"release": "stable"}}
- },
- )
+ other_ns_task = PythonOperator(
+ task_id="other_namespace_task",
+ python_callable=print_stuff,
+ executor_config={
+ "KubernetesExecutor": {"namespace": "test-namespace",
"labels": {"release": "stable"}}
+ },
+ )
- start_task >> volume_task >> third_task
- start_task >> other_ns_task
- start_task >> sidecar_task
- start_task >> task_with_template
+ start_task >> volume_task >> third_task
+ start_task >> other_ns_task
+ start_task >> sidecar_task
+ start_task >> task_with_template
+except ImportError as e:
+ log.warning("Could not import DAGs in
example_kubernetes_executor_config.py: %s", str(e))
+ log.warning("Install kubernetes dependencies with: pip install
apache-airflow['kubernetes']")
Review comment:
Shall we use `cncf.kubernetes` instead? Given `kubernetes` is planned to
be deprecated in 2.1
https://github.com/sekikn/airflow/blob/41cf172d6af1a5406367e179df763ef1992e4108/setup.py#L604
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]