jedcunningham commented on a change in pull request #19355:
URL: https://github.com/apache/airflow/pull/19355#discussion_r742857463



##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": 
"airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to 
the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` 
commands.
+                        # A scenario where changing the base image could be 
needed is if you have a
+                        # special library that you want to run only in this 
task. In that case,
+                        # you build the image with the special library, on top 
the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",

Review comment:
       To make this portable, what if we used `[kubernetes] 
worker_container_repository` and `[kubernetes] worker_container_tag` instead? 
Then it'd be more likely to work and we could just have a similar comment in 
the hypothetical, e.g. "Imagine you need custom library for only this task, 
build a new image off of your normal image and provide it here. (We simply use 
the default image to ensure this example will run)" type thing.
   
   If we wanted it to "always" work, we'd really need to use those configs OR 
pull the image out of the `pod_template_file`.
   
   I sorta wonder if there is value in having a running example of this vs just 
documenting it though.

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": 
"airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to 
the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` 
commands.
+                        # A scenario where changing the base image could be 
needed is if you have a
+                        # special library that you want to run only in this 
task. In that case,
+                        # you build the image with the special library, on top 
the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": 
"airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to 
the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` 
commands.
+                        # A scenario where changing the base image could be 
needed is if you have a
+                        # special library that you want to run only in this 
task. In that case,
+                        # you build the image with the special library, on top 
the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
     def assert_zip_binary():
         """
         Checks whether Zip is installed.
-
         :raises SystemError: if zip is not installed
         """
         return_code = os.system("zip")
         if return_code != 0:
             raise SystemError("The zip binary is not found")
 
     # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 
'values': ['airflow']}]
-                    },
-                }
+    # Use k8s_client.V1Affinity to define node affinity & tolerations

Review comment:
       ```suggestion
       # Use k8s_client.V1Affinity to define node affinity
   ```

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": 
"airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to 
the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` 
commands.
+                        # A scenario where changing the base image could be 
needed is if you have a
+                        # special library that you want to run only in this 
task. In that case,
+                        # you build the image with the special library, on top 
the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": 
"airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to 
the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` 
commands.
+                        # A scenario where changing the base image could be 
needed is if you have a
+                        # special library that you want to run only in this 
task. In that case,
+                        # you build the image with the special library, on top 
the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",

Review comment:
       I'm not a fan of this example. It can't be done in a portable way, nor 
do I think we should publish a `zip` image just for an example DAG. Did you try 
the env var check example I mentioned yesterday instead?

##########
File path: airflow/example_dags/example_kubernetes_executor.py
##########
@@ -38,57 +47,107 @@ def start_task():
         print_stuff()
 
     # But you can if you want to
-    kube_exec_config_special = {"KubernetesExecutor": {"image": 
"airflow/ci:latest"}}
+    kube_exec_config_special = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to 
the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` 
commands.
+                        # A scenario where changing the base image could be 
needed is if you have a
+                        # special library that you want to run only in this 
task. In that case,
+                        # you build the image with the special library, on top 
the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(executor_config=kube_exec_config_special)
     def one_task():
         print_stuff()
 
     # Use the zip binary, which is only found in this special docker image
-    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": 
"airflow/ci_zip:latest"}}
+    kube_exec_config_zip_binary = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        # Please note that the image here must be similar to 
the one in your Dockerfile.
+                        # Using, for example, ubuntu:latest as base image
+                        # here would fail because it doesn't have `airflow` 
commands.
+                        # A scenario where changing the base image could be 
needed is if you have a
+                        # special library that you want to run only in this 
task. In that case,
+                        # you build the image with the special library, on top 
the image in your Dockerfile.
+                        image="apache/airflow:2.2.1-python3.9-zip",
+                    ),
+                ]
+            )
+        )
+    }
 
     @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
     def assert_zip_binary():
         """
         Checks whether Zip is installed.
-
         :raises SystemError: if zip is not installed
         """
         return_code = os.system("zip")
         if return_code != 0:
             raise SystemError("The zip binary is not found")
 
     # Limit resources on this operator/task with node affinity & tolerations
-    affinity = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [{'key': 'app', 'operator': 'In', 
'values': ['airflow']}]
-                    },
-                }
+    # Use k8s_client.V1Affinity to define node affinity & tolerations
+    k8s_affinity = k8s.V1Affinity(
+        pod_anti_affinity=k8s.V1PodAntiAffinity(
+            required_during_scheduling_ignored_during_execution=[
+                k8s.V1PodAffinityTerm(
+                    label_selector=k8s.V1LabelSelector(
+                        match_expressions=[
+                            k8s.V1LabelSelectorRequirement(key='app', 
operator='In', values=['airflow'])
+                        ]
+                    ),
+                    topology_key='kubernetes.io/hostname',
+                )
             ]
-        }
-    }
+        )
+    )
 
-    tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 
'airflow'}]
+    # Use k8s_client.V1Toleration to define node tolerations
+    k8s_tolerations = [k8s.V1Toleration(key='dedicated', operator='Equal', 
value='airflow')]
 
-    kube_exec_config_resource_limits = {
-        "KubernetesExecutor": {
-            "request_memory": "128Mi",
-            "limit_memory": "128Mi",
-            "tolerations": tolerations,
-            "affinity": affinity,
-        }
+    # Use k8s_client.V1ResourceRequirements to define resource limits
+    k8s_resource_limits = k8s.V1ResourceRequirements(requests={'memory': 
'128Mi'}, limits={'memory': '128Mi'})
+    # use k8s_client.V1PodSpec to define pod spec with affinity, tolerations, 
and resource limits
+    k8s_exec_config_resources = {

Review comment:
       We should change this name, as it's not just resources.
   
   Also, I think we should reorder these examples. I have to imagine resources 
are used significantly more than custom worker images (which probably should be 
the last example).

##########
File path: airflow/example_dags/example_kubernetes_executor_config.py
##########
@@ -23,149 +23,156 @@
 from datetime import datetime
 
 from airflow import DAG
+from airflow.configuration import AIRFLOW_HOME
 from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
 
 try:
     from kubernetes.client import models as k8s
-
-    with DAG(
-        dag_id='example_kubernetes_executor_config',
-        schedule_interval=None,
-        start_date=datetime(2021, 1, 1),
-        catchup=False,
-        tags=['example3'],
-    ) as dag:
-        # You can use annotations on your kubernetes pods!
-        start_task_executor_config = {
-            "pod_override": 
k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-        }
-
-        @task(executor_config=start_task_executor_config)
-        def start_task():
-            print_stuff()
-
-        start_task = start_task()
-
-        # [START task_with_volume]
-        executor_config_volume_mount = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            volume_mounts=[
-                                k8s.V1VolumeMount(mount_path="/foo/", 
name="example-kubernetes-test-volume")
-                            ],
-                        )
-                    ],
-                    volumes=[
-                        k8s.V1Volume(
-                            name="example-kubernetes-test-volume",
-                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                        )
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_volume_mount)
-        def test_volume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            with open('/foo/volume_mount_test.txt', 'w') as foo:
-                foo.write('Hello')
-
-            return_code = os.system("cat /foo/volume_mount_test.txt")
-            if return_code != 0:
-                raise ValueError(f"Error when checking volume mount. Return 
code {return_code}")
-
-        volume_task = test_volume_mount()
-        # [END task_with_volume]
-
-        # [START task_with_template]
-        executor_config_template = {
-            "pod_template_file": os.path.join(AIRFLOW_HOME, 
"pod_templates/basic_template.yaml"),
-            "pod_override": 
k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-        }
-
-        @task(executor_config=executor_config_template)
-        def task_with_template():
-            print_stuff()
-
-        task_with_template = task_with_template()
-        # [END task_with_template]
-
-        # [START task_with_sidecar]
-        executor_config_sidecar = {
-            "pod_override": k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[
-                        k8s.V1Container(
-                            name="base",
-                            
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", 
name="shared-empty-dir")],
-                        ),
-                        k8s.V1Container(
-                            name="sidecar",
-                            image="ubuntu",
-                            args=["echo \"retrieved from mount\" > 
/shared/test.txt"],
-                            command=["bash", "-cx"],
-                            
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", 
name="shared-empty-dir")],
-                        ),
-                    ],
-                    volumes=[
-                        k8s.V1Volume(name="shared-empty-dir", 
empty_dir=k8s.V1EmptyDirVolumeSource()),
-                    ],
-                )
-            ),
-        }
-
-        @task(executor_config=executor_config_sidecar)
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. 
Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
-
-        sidecar_task = test_sharedvolume_mount()
-        # [END task_with_sidecar]
-
-        # Test that we can add labels to pods
-        executor_config_non_root = {
-            "pod_override": 
k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-        }
-
-        @task(executor_config=executor_config_non_root)
-        def non_root_task():
-            print_stuff()
-
-        third_task = non_root_task()
-
-        executor_config_other_ns = {
-            "KubernetesExecutor": {"namespace": "test-namespace", "labels": 
{"release": "stable"}}
-        }
-
-        @task(executor_config=executor_config_other_ns)
-        def other_namespace_task():
-            print_stuff()
-
-        other_ns_task = other_namespace_task()
-
-        start_task >> volume_task >> third_task
-        start_task >> other_ns_task
-        start_task >> sidecar_task
-        start_task >> task_with_template
 except ImportError as e:
     log.warning("Could not import DAGs in 
example_kubernetes_executor_config.py: %s", str(e))
-    log.warning("Install kubernetes dependencies with: pip install 
apache-airflow['cncf.kubernetes']")
+    log.warning("Install kubernetes dependencies with: pip install 
apache-airflow[cncf.kubernetes]")
+
+with DAG(
+    dag_id='example_kubernetes_executor_config',
+    schedule_interval=None,
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example3'],
+) as dag:
+    # You can use annotations on your kubernetes pods!
+    start_task_executor_config = {
+        "pod_override": 
k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+    }
+
+    @task(executor_config=start_task_executor_config)
+    def start_task():
+        print_stuff()
+
+    start_task = start_task()
+
+    # [START task_with_volume]
+    executor_config_volume_mount = {
+        "pod_override": k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name="base",
+                        volume_mounts=[
+                            k8s.V1VolumeMount(mount_path="/foo/", 
name="example-kubernetes-test-volume")
+                        ],
+                    )
+                ],
+                volumes=[
+                    k8s.V1Volume(
+                        name="example-kubernetes-test-volume",
+                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                    )
+                ],
+            )
+        ),
+    }
+
+    @task(executor_config=executor_config_volume_mount)
+    def test_volume_mount():
+        """
+        Tests whether the volume has been mounted.
+        """
+
+        with open('/foo/volume_mount_test.txt', 'w') as foo:
+            foo.write('Hello')
+
+        return_code = os.system("cat /foo/volume_mount_test.txt")
+        if return_code != 0:
+            raise ValueError(f"Error when checking volume mount. Return code 
{return_code}")
+
+    volume_task = test_volume_mount()
+    # [END task_with_volume]
+
+    # [START task_with_template]
+    # Be careful when changing the base image in the pod_template_file.
+    # Using, for example, ubuntu:latest as base image
+    # here would fail because it doesn't have `airflow` commands.
+    # A scenario where changing the base image could be needed is if you have 
a special library that
+    # you want to run only in this task. In that case, you build the image
+    # with the special library, on top the image in your Dockerfile.
+    executor_config_template = {
+        "pod_template_file": os.path.join(AIRFLOW_HOME, 
"pod_templates/pod_template_file.yaml"),

Review comment:
       This file doesn't exist by default, it just so happens to be where the 
official chart puts it. We also couldn't add a custom pod_template_file that'll 
work in all instances.
   
   I think we should move this example into the docs so we don't actually try 
and run it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to