lexey-e-shelf opened a new issue, #37577: URL: https://github.com/apache/airflow/issues/37577
### Apache Airflow version 2.8.1 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? I tried to pass the result of an `expand`ed `@task.kubernetes` task to another `@task.kubernetes` task, and it failed. ### What you think should happen instead? `@task.kubernetes` tasks should be able to handle `LazyXComAccess` objects with the following constraints: - The task pods remain independent of Airflow [so that Airflow can act as the job orchestrator](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html#how-does-this-operator-work:~:text=The%20KubernetesPodOperator%20enables,are%20written%20in.) - No Airflow-dependent tasks (e.g., `PythonOperator`) are necessary for middleman eager-loading of `LazyXComAccess` objects ### How to reproduce - Run Airflow using the image built from the Dockerfile - Run the DAG <details> <summary>DAG definition</summary> ```python """ Based on the example DAG demonstrating the usage of dynamic task mapping. Found here: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#simple-mapping """ from __future__ import annotations from datetime import datetime from airflow.decorators import task, dag @dag(dag_id="example_dynamic_task_mapping_with_k8s", start_date=datetime(2022, 3, 4), schedule_interval="*/1 * * * *") def example_dynamic_task_mapping(): # The "add_one" variants @task.kubernetes(task_id="add_one_k8s", image="python:3.11-slim-bookworm", do_xcom_push=True) def add_one_k8s(x: int) -> int: return x + 1 @task(task_id="add_one_python") def add_one_python(x: int) -> int: return x + 1 # The "sum_it" variants @task(task_id="sum_it_python") def sum_it_python(values) -> None: total = sum(values) print(f"Total was {total}") @task.kubernetes(task_id="sum_it_k8s", image="python:3.11-slim-bookworm", do_xcom_push=True) def sum_it_k8s(values) -> None: total = sum(values) print(f"Total was {total}") @task.kubernetes(task_id="sum_it_k8s_airflow", image="apache/airflow:2.8.1-python3.11", do_xcom_push=True) def sum_it_k8s_airflow(values) -> None: total = sum(values) print(f"Total was {total}") @task.kubernetes(task_id="sum_it_k8s_airflow_config", image="apache/airflow:2.8.1-python3.11", do_xcom_push=True, env_vars={"AIRFLOW__DATABASE__SQL_ALCHEMY_CONN": "postgresql+psycopg2://postgres:[email protected]:6432/airflow"}) def sum_it_k8s_airflow_config(values) -> None: total = sum(values) print(f"Total was {total}") @task.kubernetes(task_id="sum_it_k8s_lazyevaluated", image="python:3.11-slim-bookworm", do_xcom_push=True) def sum_it_k8s_lazyevaluated(values) -> None: total = sum(values) print(f"Total was {total}") @task(task_id="evaluate_lazy_xcoms_python") def evaluate_lazy_xcoms(values: list[int]) -> list[int]: return list[int](values) # The "sum_it" variants for the python "add_one" task added_values_python = add_one_python.expand(x=[1, 2, 3]) sum_it_python(added_values_python) sum_it_k8s(added_values_python) sum_it_k8s_airflow(added_values_python) sum_it_k8s_airflow_config(added_values_python) sum_it_k8s_lazyevaluated(evaluate_lazy_xcoms(added_values_python)) # The "sum_it" variants for the k8s "add_one" task added_values_k8s = add_one_k8s.expand(x=[4, 5, 6]) sum_it_python(added_values_k8s) sum_it_k8s(added_values_k8s) sum_it_k8s_airflow(added_values_k8s) sum_it_k8s_airflow_config(added_values_k8s) sum_it_k8s_lazyevaluated(evaluate_lazy_xcoms(added_values_k8s)) example_dynamic_task_mapping() ``` </details> <details> <summary>Results</summary>  <details> <summary>sum_it_python</summary> ```log airflow-worker-0.airflow-worker.default.svc.cluster.local *** Found logs served from host http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_python/attempt=1.log [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_python scheduled__2024-02-21T00:07:00+00:00 [queued]> [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_python scheduled__2024-02-21T00:07:00+00:00 [queued]> [2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 of 1 [2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing <Task(_PythonDecoratedOperator): sum_it_python> on 2024-02-21 00:07:00+00:00 [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started process 3063 to run task [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 'sum_it_python', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '53', '--raw', '--subdir', 'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', '/tmp/tmphlpc531x'] [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 53: Subtask sum_it_python [2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running <TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_python scheduled__2024-02-21T00:07:00+00:00 [running]> on host ***-worker-0.***-worker.default.svc.cluster.local [2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' AIRFLOW_CTX_TASK_ID='sum_it_python' AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00' [2024-02-21, 00:08:44 UTC] {logging_mixin.py:188} INFO - Total was 9 [2024-02-21, 00:08:44 UTC] {python.py:201} INFO - Done. Returned value was: None [2024-02-21, 00:08:44 UTC] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=example_dynamic_task_mapping_with_k8s, task_id=sum_it_python, execution_date=20240221T000700, start_date=20240221T000844, end_date=20240221T000844 [2024-02-21, 00:08:44 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0 [2024-02-21, 00:08:44 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` </details> <details> <summary>sum_it_k8s</summary> ```log airflow-worker-0.airflow-worker.default.svc.cluster.local *** Found logs served from host http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_k8s/attempt=1.log [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s scheduled__2024-02-21T00:07:00+00:00 [queued]> [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s scheduled__2024-02-21T00:07:00+00:00 [queued]> [2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 of 1 [2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing <Task(_KubernetesDecoratedOperator): sum_it_k8s> on 2024-02-21 00:07:00+00:00 [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started process 3062 to run task [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 'sum_it_k8s', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '52', '--raw', '--subdir', 'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', '/tmp/tmpwibfdck4'] [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 52: Subtask sum_it_k8s [2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running <TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s scheduled__2024-02-21T00:07:00+00:00 [running]> on host ***-worker-0.***-worker.default.svc.cluster.local [2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' AIRFLOW_CTX_TASK_ID='sum_it_k8s' AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00' [2024-02-21, 00:08:44 UTC] {warnings.py:109} WARNING - /home/***/.local/lib/python3.11/site-packages/***/providers/cncf/kubernetes/operators/pod.py:1036: AirflowProviderDeprecationWarning: This function is deprecated. Please use `add_unique_suffix`. pod.metadata.name = add_pod_suffix(pod_name=pod.metadata.name) [2024-02-21, 00:08:44 UTC] {pod.py:1055} INFO - Building pod k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip with labels: {'dag_id': 'example_dynamic_task_mapping_with_k8s', 'task_id': 'sum_it_k8s', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'kubernetes_pod_operator': 'True', 'try_number': '1'} [2024-02-21, 00:08:44 UTC] {pod.py:526} INFO - Found matching pod k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip with labels {'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.8.1', 'dag_id': 'example_dynamic_task_mapping_with_k8s', 'kubernetes_pod_operator': 'True', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'task_id': 'sum_it_k8s', 'try_number': '1'} [2024-02-21, 00:08:44 UTC] {pod.py:527} INFO - `try_number` of task_instance: 1 [2024-02-21, 00:08:44 UTC] {pod.py:528} INFO - `try_number` of pod: 1 [2024-02-21, 00:08:44 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip [2024-02-21, 00:08:46 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip [2024-02-21, 00:08:47 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip [2024-02-21, 00:08:48 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip [2024-02-21, 00:08:49 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x); f.close()' [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x); f.close()' [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] + mkdir -p /***/xcom [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] + python /tmp/script.py /tmp/script.in /***/xcom/return.json [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] Traceback (most recent call last): [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] File "/tmp/script.py", line 16, in <module> [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] arg_dict = pickle.load(file) [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^^^^ [2024-02-21, 00:08:50 UTC] {pod_manager.py:483} INFO - [base] ModuleNotFoundError: No module named '***' [2024-02-21, 00:08:50 UTC] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started. [2024-02-21, 00:08:50 UTC] {pod_manager.py:721} INFO - The xcom sidecar container is started. [2024-02-21, 00:08:50 UTC] {pod_manager.py:798} INFO - Running command... if [ -s /***/xcom/return.json ]; then cat /***/xcom/return.json; else echo __***_xcom_result_empty__; fi [2024-02-21, 00:08:50 UTC] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1 [2024-02-21, 00:08:50 UTC] {pod.py:559} INFO - xcom result file is empty. [2024-02-21, 00:08:50 UTC] {pod_manager.py:616} INFO - Pod k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip has phase Running [2024-02-21, 00:08:52 UTC] {pod.py:909} INFO - Deleting pod: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip [2024-02-21, 00:08:52 UTC] {taskinstance.py:2698} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task result = execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes.py", line 128, in execute return super().execute(context) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", line 241, in execute return_value = super().execute(context) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 570, in execute return self.execute_sync(context) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 629, in execute_sync self.cleanup( File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 839, in cleanup raise AirflowException( airflow.exceptions.AirflowException: Pod k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip returned a failure. remote_pod: {'api_version': 'v1', 'kind': 'Pod', 'metadata': {'annotations': None, ... 'start_time': datetime.datetime(2024, 2, 21, 0, 8, 44, tzinfo=tzlocal())}} [2024-02-21, 00:08:52 UTC] {taskinstance.py:1138} INFO - Marking task as FAILED. dag_id=example_dynamic_task_mapping_with_k8s, task_id=sum_it_k8s, execution_date=20240221T000700, start_date=20240221T000844, end_date=20240221T000852 [2024-02-21, 00:08:52 UTC] {standard_task_runner.py:107} ERROR - Failed to execute job 52 for task sum_it_k8s (Pod k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip returned a failure. remote_pod: {'api_version': 'v1', 'kind': 'Pod', 'metadata': {'annotations': None, 'creation_timestamp': datetime.datetime(2024, 2, 21, 0, 8, 44, tzinfo=tzlocal()), ... 'start_time': datetime.datetime(2024, 2, 21, 0, 8, 44, tzinfo=tzlocal())}}; 3062) [2024-02-21, 00:08:52 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 1 [2024-02-21, 00:08:52 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` </details> <details> <summary>sum_it_k8s_airflow</summary> ```log airflow-worker-0.airflow-worker.default.svc.cluster.local *** Found logs served from host http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_k8s_airflow/attempt=1.log [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow scheduled__2024-02-21T00:07:00+00:00 [queued]> [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow scheduled__2024-02-21T00:07:00+00:00 [queued]> [2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 of 1 [2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing <Task(_KubernetesDecoratedOperator): sum_it_k8s_airflow> on 2024-02-21 00:07:00+00:00 [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started process 3044 to run task [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 'sum_it_k8s_***', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '49', '--raw', '--subdir', 'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', '/tmp/tmp7k0l0mmp'] [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 49: Subtask sum_it_k8s_*** [2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running <TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow scheduled__2024-02-21T00:07:00+00:00 [running]> on host ***-worker-0.***-worker.default.svc.cluster.local [2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' AIRFLOW_CTX_TASK_ID='sum_it_k8s_***' AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00' [2024-02-21, 00:08:44 UTC] {warnings.py:109} WARNING - /home/***/.local/lib/python3.11/site-packages/***/providers/cncf/kubernetes/operators/pod.py:1036: AirflowProviderDeprecationWarning: This function is deprecated. Please use `add_unique_suffix`. pod.metadata.name = add_pod_suffix(pod_name=pod.metadata.name) [2024-02-21, 00:08:44 UTC] {pod.py:1055} INFO - Building pod k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t with labels: {'dag_id': 'example_dynamic_task_mapping_with_k8s', 'task_id': 'sum_it_k8s_***', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'kubernetes_pod_operator': 'True', 'try_number': '1'} [2024-02-21, 00:08:44 UTC] {pod.py:526} INFO - Found matching pod k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t with labels {'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.8.1', 'dag_id': 'example_dynamic_task_mapping_with_k8s', 'kubernetes_pod_operator': 'True', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'task_id': 'sum_it_k8s_***', 'try_number': '1'} [2024-02-21, 00:08:44 UTC] {pod.py:527} INFO - `try_number` of task_instance: 1 [2024-02-21, 00:08:44 UTC] {pod.py:528} INFO - `try_number` of pod: 1 [2024-02-21, 00:08:44 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t [2024-02-21, 00:08:45 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t [2024-02-21, 00:08:46 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t [2024-02-21, 00:08:47 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x); f.close()' [2024-02-21, 00:08:47 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x); f.close()' [2024-02-21, 00:08:47 UTC] {pod_manager.py:466} INFO - [base] + mkdir -p /***/xcom 2024-02-21T00:08:49.598549087Z 2024-02-21T00:08:49.598558379Z [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] + python /tmp/script.py /tmp/script.in /***/xcom/return.json [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] Traceback (most recent call last): [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] self.dialect.do_execute( [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] cursor.execute(statement, parameters) [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] sqlite3.OperationalError: no such table: xcom [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] The above exception was the direct cause of the following exception: [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] Traceback (most recent call last): [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/tmp/script.py", line 24, in <module> [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] res = sum_it_k8s_***(*arg_dict["args"], **arg_dict["kwargs"]) [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/tmp/script.py", line 21, in sum_it_k8s_*** [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] total = sum(values) [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^ [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/***/models/xcom.py", line 721, in __next__ [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] return XCom.deserialize_value(next(self._it)) [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^ [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 2901, in __iter__ [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] result = self._iter() [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^ [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 2916, in _iter [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] result = self.session.execute( [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^^^^^^^^ [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1717, in execute [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] result = conn._execute_20(statement, params or {}, execution_options) [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20 [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] return meth(self, args_10style, kwargs_10style, execution_options) [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] return connection._execute_clauseelement( [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ret = self._execute_context( [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] ^^^^^^^^^^^^^^^^^^^^^^ [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] self._handle_dbapi_exception( [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] util.raise_( [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] raise exception [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] self.dialect.do_execute( [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] File "/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] cursor.execute(statement, parameters) [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: xcom [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] [SQL: SELECT xcom.value [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] FROM xcom JOIN dag_run ON xcom.dag_run_id = dag_run.id [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] WHERE xcom.key = 'return_value' AND xcom.task_id = 'add_one_python' AND xcom.dag_id = 'example_dynamic_task_mapping_with_k8s' AND xcom.run_id = 'scheduled__2024-02-21T00:07:00+00:00' ORDER BY xcom.map_index ASC] [2024-02-21, 00:08:50 UTC] {pod_manager.py:483} INFO - [base] (Background on this error at: https://sqlalche.me/e/14/e3q8) [2024-02-21, 00:08:50 UTC] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started. [2024-02-21, 00:08:50 UTC] {pod_manager.py:721} INFO - The xcom sidecar container is started. [2024-02-21, 00:08:50 UTC] {pod_manager.py:798} INFO - Running command... if [ -s /***/xcom/return.json ]; then cat /***/xcom/return.json; else echo __***_xcom_result_empty__; fi [2024-02-21, 00:08:50 UTC] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1 [2024-02-21, 00:08:51 UTC] {pod.py:559} INFO - xcom result file is empty. [2024-02-21, 00:08:51 UTC] {pod_manager.py:616} INFO - Pod k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t has phase Running [2024-02-21, 00:08:53 UTC] {pod_manager.py:616} INFO - Pod k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t has phase Running [2024-02-21, 00:08:55 UTC] {pod.py:909} INFO - Deleting pod: k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t [2024-02-21, 00:08:55 UTC] {taskinstance.py:2698} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task result = execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes.py", line 128, in execute return super().execute(context) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", line 241, in execute return_value = super().execute(context) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 570, in execute return self.execute_sync(context) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 629, in execute_sync self.cleanup( File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 839, in cleanup raise AirflowException( airflow.exceptions.AirflowException: Pod k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t returned a failure. remote_pod: {'api_version': 'v1', 'kind': 'Pod', 'metadata': {'annotations': None, 'creation_timestamp': datetime.datetime(2024, 2, 21, 0, 8, 44, tzinfo=tzlocal()), ... 'start_time': datetime.datetime(2024, 2, 21, 0, 8, 44, tzinfo=tzlocal())}}; 3044) [2024-02-21, 00:08:55 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 1 [2024-02-21, 00:08:55 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` </details> <details> <summary>sum_it_k8s_airflow_config</summary> ```log airflow-worker-0.airflow-worker.default.svc.cluster.local *** Found logs served from host http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_k8s_airflow_config/attempt=1.log [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow_config scheduled__2024-02-21T00:07:00+00:00 [queued]> [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow_config scheduled__2024-02-21T00:07:00+00:00 [queued]> [2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 of 1 [2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing <Task(_KubernetesDecoratedOperator): sum_it_k8s_airflow_config> on 2024-02-21 00:07:00+00:00 [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started process 3049 to run task [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 'sum_it_k8s_***_config', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '51', '--raw', '--subdir', 'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', '/tmp/tmp04503lkh'] [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 51: Subtask sum_it_k8s_***_config [2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running <TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow_config scheduled__2024-02-21T00:07:00+00:00 [running]> on host ***-worker-0.***-worker.default.svc.cluster.local [2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' AIRFLOW_CTX_TASK_ID='sum_it_k8s_***_config' AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00' [2024-02-21, 00:08:44 UTC] {warnings.py:109} WARNING - /home/***/.local/lib/python3.11/site-packages/***/providers/cncf/kubernetes/operators/pod.py:1036: AirflowProviderDeprecationWarning: This function is deprecated. Please use `add_unique_suffix`. pod.metadata.name = add_pod_suffix(pod_name=pod.metadata.name) [2024-02-21, 00:08:44 UTC] {pod.py:1055} INFO - Building pod k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k with labels: {'dag_id': 'example_dynamic_task_mapping_with_k8s', 'task_id': 'sum_it_k8s_***_config', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'kubernetes_pod_operator': 'True', 'try_number': '1'} [2024-02-21, 00:08:44 UTC] {pod.py:526} INFO - Found matching pod k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k with labels {'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.8.1', 'dag_id': 'example_dynamic_task_mapping_with_k8s', 'kubernetes_pod_operator': 'True', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'task_id': 'sum_it_k8s_***_config', 'try_number': '1'} [2024-02-21, 00:08:44 UTC] {pod.py:527} INFO - `try_number` of task_instance: 1 [2024-02-21, 00:08:44 UTC] {pod.py:528} INFO - `try_number` of pod: 1 [2024-02-21, 00:08:44 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k [2024-02-21, 00:08:45 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k [2024-02-21, 00:08:46 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k [2024-02-21, 00:08:47 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k [2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x); f.close()' [2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x); f.close()' [2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + mkdir -p /***/xcom [2024-02-21, 00:08:51 UTC] {pod_manager.py:466} INFO - [base] + python /tmp/script.py /tmp/script.in /***/xcom/return.json [2024-02-21, 00:08:52 UTC] {pod_manager.py:483} INFO - [base] Total was 9 [2024-02-21, 00:08:52 UTC] {pod_manager.py:511} WARNING - Pod k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k log read interrupted but container base still running [2024-02-21, 00:08:53 UTC] {pod_manager.py:483} INFO - [base] Total was 9 [2024-02-21, 00:08:53 UTC] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started. [2024-02-21, 00:08:53 UTC] {pod_manager.py:721} INFO - The xcom sidecar container is started. [2024-02-21, 00:08:53 UTC] {pod_manager.py:798} INFO - Running command... if [ -s /***/xcom/return.json ]; then cat /***/xcom/return.json; else echo __***_xcom_result_empty__; fi [2024-02-21, 00:08:53 UTC] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1 [2024-02-21, 00:08:53 UTC] {pod.py:559} INFO - xcom result file is empty. [2024-02-21, 00:08:53 UTC] {pod_manager.py:616} INFO - Pod k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k has phase Running [2024-02-21, 00:08:55 UTC] {pod.py:909} INFO - Deleting pod: k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k [2024-02-21, 00:08:55 UTC] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=example_dynamic_task_mapping_with_k8s, task_id=sum_it_k8s_***_config, execution_date=20240221T000700, start_date=20240221T000844, end_date=20240221T000855 [2024-02-21, 00:08:55 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0 [2024-02-21, 00:08:55 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` </details> <details> <summary>evaluate_lazy_xcoms_python</summary> ```log airflow-worker-0.airflow-worker.default.svc.cluster.local *** Found logs served from host http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=evaluate_lazy_xcoms_python/attempt=1.log [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.evaluate_lazy_xcoms_python scheduled__2024-02-21T00:07:00+00:00 [queued]> [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.evaluate_lazy_xcoms_python scheduled__2024-02-21T00:07:00+00:00 [queued]> [2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 of 1 [2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing <Task(_PythonDecoratedOperator): evaluate_lazy_xcoms_python> on 2024-02-21 00:07:00+00:00 [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started process 3048 to run task [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 'evaluate_lazy_xcoms_python', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '50', '--raw', '--subdir', 'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', '/tmp/tmps0lkgd19'] [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 50: Subtask evaluate_lazy_xcoms_python [2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running <TaskInstance: example_dynamic_task_mapping_with_k8s.evaluate_lazy_xcoms_python scheduled__2024-02-21T00:07:00+00:00 [running]> on host ***-worker-0.***-worker.default.svc.cluster.local [2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' AIRFLOW_CTX_TASK_ID='evaluate_lazy_xcoms_python' AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00' [2024-02-21, 00:08:44 UTC] {python.py:201} INFO - Done. Returned value was: [2, 3, 4] [2024-02-21, 00:08:44 UTC] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=example_dynamic_task_mapping_with_k8s, task_id=evaluate_lazy_xcoms_python, execution_date=20240221T000700, start_date=20240221T000844, end_date=20240221T000844 [2024-02-21, 00:08:44 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0 [2024-02-21, 00:08:44 UTC] {taskinstance.py:3280} INFO - 1 downstream tasks scheduled from follow-on schedule check ``` </details> <details> <summary>sum_it_k8s_lazyevaluated</summary> ```log airflow-worker-0.airflow-worker.default.svc.cluster.local *** Found logs served from host http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_k8s_lazyevaluated/attempt=1.log [2024-02-21, 00:08:45 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_lazyevaluated scheduled__2024-02-21T00:07:00+00:00 [queued]> [2024-02-21, 00:08:45 UTC] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_lazyevaluated scheduled__2024-02-21T00:07:00+00:00 [queued]> [2024-02-21, 00:08:45 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 of 1 [2024-02-21, 00:08:45 UTC] {taskinstance.py:2191} INFO - Executing <Task(_KubernetesDecoratedOperator): sum_it_k8s_lazyevaluated> on 2024-02-21 00:07:00+00:00 [2024-02-21, 00:08:45 UTC] {standard_task_runner.py:60} INFO - Started process 3099 to run task [2024-02-21, 00:08:45 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 'sum_it_k8s_lazyevaluated', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '54', '--raw', '--subdir', 'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', '/tmp/tmph4g0tr_h'] [2024-02-21, 00:08:45 UTC] {standard_task_runner.py:88} INFO - Job 54: Subtask sum_it_k8s_lazyevaluated [2024-02-21, 00:08:45 UTC] {task_command.py:423} INFO - Running <TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_lazyevaluated scheduled__2024-02-21T00:07:00+00:00 [running]> on host ***-worker-0.***-worker.default.svc.cluster.local [2024-02-21, 00:08:45 UTC] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' AIRFLOW_CTX_TASK_ID='sum_it_k8s_lazyevaluated' AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00' [2024-02-21, 00:08:45 UTC] {warnings.py:109} WARNING - /home/***/.local/lib/python3.11/site-packages/***/providers/cncf/kubernetes/operators/pod.py:1036: AirflowProviderDeprecationWarning: This function is deprecated. Please use `add_unique_suffix`. pod.metadata.name = add_pod_suffix(pod_name=pod.metadata.name) [2024-02-21, 00:08:45 UTC] {pod.py:1055} INFO - Building pod k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4 with labels: {'dag_id': 'example_dynamic_task_mapping_with_k8s', 'task_id': 'sum_it_k8s_lazyevaluated', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'kubernetes_pod_operator': 'True', 'try_number': '1'} [2024-02-21, 00:08:45 UTC] {pod.py:526} INFO - Found matching pod k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4 with labels {'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.8.1', 'dag_id': 'example_dynamic_task_mapping_with_k8s', 'kubernetes_pod_operator': 'True', 'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'task_id': 'sum_it_k8s_lazyevaluated', 'try_number': '1'} [2024-02-21, 00:08:45 UTC] {pod.py:527} INFO - `try_number` of task_instance: 1 [2024-02-21, 00:08:45 UTC] {pod.py:528} INFO - `try_number` of pod: 1 [2024-02-21, 00:08:45 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4 [2024-02-21, 00:08:46 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4 [2024-02-21, 00:08:47 UTC] {pod_manager.py:373} WARNING - Pod not yet started: k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4 [2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = open("/tmp/script.py", "wb"); f.write(x); f.close()' [2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python -c 'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = open("/tmp/script.in", "wb"); f.write(x); f.close()' [2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + mkdir -p /***/xcom [2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python /tmp/script.py /tmp/script.in /***/xcom/return.json [2024-02-21, 00:08:48 UTC] {pod_manager.py:483} INFO - [base] Total was 9 [2024-02-21, 00:08:48 UTC] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started. [2024-02-21, 00:08:48 UTC] {pod_manager.py:721} INFO - The xcom sidecar container is started. [2024-02-21, 00:08:48 UTC] {pod_manager.py:798} INFO - Running command... if [ -s /***/xcom/return.json ]; then cat /***/xcom/return.json; else echo __***_xcom_result_empty__; fi [2024-02-21, 00:08:48 UTC] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1 [2024-02-21, 00:08:49 UTC] {pod.py:559} INFO - xcom result file is empty. [2024-02-21, 00:08:49 UTC] {pod_manager.py:616} INFO - Pod k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4 has phase Running [2024-02-21, 00:08:52 UTC] {pod.py:909} INFO - Deleting pod: k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4 [2024-02-21, 00:08:52 UTC] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=example_dynamic_task_mapping_with_k8s, task_id=sum_it_k8s_lazyevaluated, execution_date=20240221T000700, start_date=20240221T000845, end_date=20240221T000852 [2024-02-21, 00:08:52 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0 [2024-02-21, 00:08:52 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` </details> </details> ### Operating System macOS 13.4 (22F66) ### Versions of Apache Airflow Providers `apache-airflow-providers-cncf-kubernetes` 8.0.0 ### Deployment Official Apache Airflow Helm Chart ### Deployment details <details> <summary>Host specifications</summary> - MacBook Pro with Apple M2 Pro - macOS 13.4 (22F66) - Darwin 22.5.0 - Local Kubernetes 1.27.7 - [k3d](https://k3d.io/) v5.6.0 using image `rancher/k3s:v1.27.7-k3s1` - Docker Desktop with Rosetta for x86/amd64 emulation on Apple Silicon </details> <details> <summary>Airflow specifications</summary> - Airflow 2.8.1 with Python 3.11 - Based on the `apache/airflow:2.8.1-python3.11` image - Deployed using [airflow-helm](https://github.com/airflow-helm/charts) 8.8.0 - `helm version` = version.BuildInfo{Version:"v3.12.3", GitCommit:"3a31588ad33fe3b89af5a2a54ee1d25bfe6eaa5e", GitTreeState:"clean", GoVersion:"go1.20.7"} - `apache-airflow-providers-cncf-kubernetes` 8.0.0 </details> <details> <summary>Dockerfile</summary> ### Dockerfile ```dockerfile FROM apache/airflow:2.8.1-python3.11 COPY requirements.txt requirements.txt RUN pip install --upgrade pip RUN pip install -r requirements.txt ``` ### requirements.txt ```txt apache-airflow-providers-cncf-kubernetes==8.0.0 ``` </details> ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
