lexey-e-shelf opened a new issue, #37577:
URL: https://github.com/apache/airflow/issues/37577

   ### Apache Airflow version
   
   2.8.1
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I tried to pass the result of an `expand`ed `@task.kubernetes` task to 
another `@task.kubernetes` task, and it failed.
   
   ### What you think should happen instead?
   
   `@task.kubernetes` tasks should be able to handle `LazyXComAccess` objects 
with the following constraints:
   - The task pods remain independent of Airflow [so that Airflow can act as 
the job 
orchestrator](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html#how-does-this-operator-work:~:text=The%20KubernetesPodOperator%20enables,are%20written%20in.)
   - No Airflow-dependent tasks (e.g., `PythonOperator`) are necessary for 
middleman eager-loading of `LazyXComAccess` objects
   
   ### How to reproduce
   
   - Run Airflow using the image built from the Dockerfile
   - Run the DAG
   
   <details>
   <summary>DAG definition</summary>
   
   ```python
   """
   Based on the example DAG demonstrating the usage of dynamic task mapping.
   Found here: 
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#simple-mapping
   """
   from __future__ import annotations
   
   from datetime import datetime
   
   from airflow.decorators import task, dag
   
   @dag(dag_id="example_dynamic_task_mapping_with_k8s",
        start_date=datetime(2022, 3, 4),
        schedule_interval="*/1 * * * *")
   def example_dynamic_task_mapping():
   
       # The "add_one" variants
       @task.kubernetes(task_id="add_one_k8s",
                        image="python:3.11-slim-bookworm",
                        do_xcom_push=True)
       def add_one_k8s(x: int) -> int:
           return x + 1
       
       @task(task_id="add_one_python")
       def add_one_python(x: int) -> int:
           return x + 1
       
       # The "sum_it" variants
       @task(task_id="sum_it_python")
       def sum_it_python(values) -> None:
           total = sum(values)
           print(f"Total was {total}")
   
       @task.kubernetes(task_id="sum_it_k8s",
                        image="python:3.11-slim-bookworm",
                        do_xcom_push=True)
       def sum_it_k8s(values) -> None:
           total = sum(values)
           print(f"Total was {total}")
   
       @task.kubernetes(task_id="sum_it_k8s_airflow",
                        image="apache/airflow:2.8.1-python3.11",
                        do_xcom_push=True)
       def sum_it_k8s_airflow(values) -> None:
           total = sum(values)
           print(f"Total was {total}")
   
       @task.kubernetes(task_id="sum_it_k8s_airflow_config",
                        image="apache/airflow:2.8.1-python3.11",
                        do_xcom_push=True,
                        env_vars={"AIRFLOW__DATABASE__SQL_ALCHEMY_CONN": 
"postgresql+psycopg2://postgres:[email protected]:6432/airflow"})
       def sum_it_k8s_airflow_config(values) -> None:
           total = sum(values)
           print(f"Total was {total}")
   
       @task.kubernetes(task_id="sum_it_k8s_lazyevaluated",
                        image="python:3.11-slim-bookworm",
                        do_xcom_push=True)
       def sum_it_k8s_lazyevaluated(values) -> None:
           total = sum(values)
           print(f"Total was {total}")
   
       @task(task_id="evaluate_lazy_xcoms_python")
       def evaluate_lazy_xcoms(values: list[int]) -> list[int]:
           return list[int](values)
   
       # The "sum_it" variants for the python "add_one" task
       added_values_python = add_one_python.expand(x=[1, 2, 3])
       sum_it_python(added_values_python)
       sum_it_k8s(added_values_python)
       sum_it_k8s_airflow(added_values_python)
       sum_it_k8s_airflow_config(added_values_python)
       sum_it_k8s_lazyevaluated(evaluate_lazy_xcoms(added_values_python))
   
       # The "sum_it" variants for the k8s "add_one" task
       added_values_k8s = add_one_k8s.expand(x=[4, 5, 6])
       sum_it_python(added_values_k8s)
       sum_it_k8s(added_values_k8s)
       sum_it_k8s_airflow(added_values_k8s)
       sum_it_k8s_airflow_config(added_values_k8s)
       sum_it_k8s_lazyevaluated(evaluate_lazy_xcoms(added_values_k8s))
   
   example_dynamic_task_mapping()
   
   ```
   </details>
   
   <details>
   <summary>Results</summary>
   
   
![KubernetesPodOperator-DynamicTaskMapping-XCom-Failure](https://github.com/apache/airflow/assets/144716231/fbc9ca07-a0b5-46dd-b8fa-87861b8f331b)
   <details>
   <summary>sum_it_python</summary>
   
   ```log
   airflow-worker-0.airflow-worker.default.svc.cluster.local
   *** Found logs served from host 
http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_python/attempt=1.log
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
example_dynamic_task_mapping_with_k8s.sum_it_python 
scheduled__2024-02-21T00:07:00+00:00 [queued]>
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
example_dynamic_task_mapping_with_k8s.sum_it_python 
scheduled__2024-02-21T00:07:00+00:00 [queued]>
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 
of 1
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing 
<Task(_PythonDecoratedOperator): sum_it_python> on 2024-02-21 00:07:00+00:00
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started 
process 3063 to run task
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: 
['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 
'sum_it_python', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '53', 
'--raw', '--subdir', 'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', 
'--cfg-path', '/tmp/tmphlpc531x']
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 53: 
Subtask sum_it_python
   [2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running 
<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_python 
scheduled__2024-02-21T00:07:00+00:00 [running]> on host 
***-worker-0.***-worker.default.svc.cluster.local
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='***' 
AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' 
AIRFLOW_CTX_TASK_ID='sum_it_python' 
AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00'
   [2024-02-21, 00:08:44 UTC] {logging_mixin.py:188} INFO - Total was 9
   [2024-02-21, 00:08:44 UTC] {python.py:201} INFO - Done. Returned value was: 
None
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:1138} INFO - Marking task as 
SUCCESS. dag_id=example_dynamic_task_mapping_with_k8s, task_id=sum_it_python, 
execution_date=20240221T000700, start_date=20240221T000844, 
end_date=20240221T000844
   [2024-02-21, 00:08:44 UTC] {local_task_job_runner.py:234} INFO - Task exited 
with return code 0
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   </details>
   <details>
   <summary>sum_it_k8s</summary>
   
   ```log
   airflow-worker-0.airflow-worker.default.svc.cluster.local
   *** Found logs served from host 
http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_k8s/attempt=1.log
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
example_dynamic_task_mapping_with_k8s.sum_it_k8s 
scheduled__2024-02-21T00:07:00+00:00 [queued]>
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
example_dynamic_task_mapping_with_k8s.sum_it_k8s 
scheduled__2024-02-21T00:07:00+00:00 [queued]>
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 
of 1
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing 
<Task(_KubernetesDecoratedOperator): sum_it_k8s> on 2024-02-21 00:07:00+00:00
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started 
process 3062 to run task
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: 
['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 'sum_it_k8s', 
'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '52', '--raw', '--subdir', 
'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', 
'/tmp/tmpwibfdck4']
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 52: 
Subtask sum_it_k8s
   [2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running 
<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s 
scheduled__2024-02-21T00:07:00+00:00 [running]> on host 
***-worker-0.***-worker.default.svc.cluster.local
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='***' 
AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' 
AIRFLOW_CTX_TASK_ID='sum_it_k8s' 
AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00'
   [2024-02-21, 00:08:44 UTC] {warnings.py:109} WARNING - 
/home/***/.local/lib/python3.11/site-packages/***/providers/cncf/kubernetes/operators/pod.py:1036:
 AirflowProviderDeprecationWarning: This function is deprecated. Please use 
`add_unique_suffix`.
     pod.metadata.name = add_pod_suffix(pod_name=pod.metadata.name)
   [2024-02-21, 00:08:44 UTC] {pod.py:1055} INFO - Building pod 
k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip with labels: {'dag_id': 
'example_dynamic_task_mapping_with_k8s', 'task_id': 'sum_it_k8s', 'run_id': 
'scheduled__2024-02-21T0007000000-7c62a84f4', 'kubernetes_pod_operator': 
'True', 'try_number': '1'}
   [2024-02-21, 00:08:44 UTC] {pod.py:526} INFO - Found matching pod 
k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip with labels 
{'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.8.1', 'dag_id': 
'example_dynamic_task_mapping_with_k8s', 'kubernetes_pod_operator': 'True', 
'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'task_id': 
'sum_it_k8s', 'try_number': '1'}
   [2024-02-21, 00:08:44 UTC] {pod.py:527} INFO - `try_number` of 
task_instance: 1
   [2024-02-21, 00:08:44 UTC] {pod.py:528} INFO - `try_number` of pod: 1
   [2024-02-21, 00:08:44 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip
   [2024-02-21, 00:08:46 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip
   [2024-02-21, 00:08:47 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip
   [2024-02-21, 00:08:48 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] + python -c 
'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = 
open("/tmp/script.py", "wb"); f.write(x); f.close()'
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] + python -c 
'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = 
open("/tmp/script.in", "wb"); f.write(x); f.close()'
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] + mkdir -p 
/***/xcom
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] + python 
/tmp/script.py /tmp/script.in /***/xcom/return.json
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base] Traceback 
(most recent call last):
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base]   File 
"/tmp/script.py", line 16, in <module>
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base]     arg_dict = 
pickle.load(file)
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:466} INFO - [base]                
^^^^^^^^^^^^^^^^^
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:483} INFO - [base] 
ModuleNotFoundError: No module named '***'
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:718} INFO - Checking if xcom 
sidecar container is started.
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:721} INFO - The xcom sidecar 
container is started.
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:798} INFO - Running command... if 
[ -s /***/xcom/return.json ]; then cat /***/xcom/return.json; else echo 
__***_xcom_result_empty__; fi
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:798} INFO - Running command... 
kill -s SIGINT 1
   [2024-02-21, 00:08:50 UTC] {pod.py:559} INFO - xcom result file is empty.
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:616} INFO - Pod 
k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip has phase Running
   [2024-02-21, 00:08:52 UTC] {pod.py:909} INFO - Deleting pod: 
k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip
   [2024-02-21, 00:08:52 UTC] {taskinstance.py:2698} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 433, in _execute_task
       result = execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes.py",
 line 128, in execute
       return super().execute(context)
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", 
line 241, in execute
       return_value = super().execute(context)
                      ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 570, in execute
       return self.execute_sync(context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 629, in execute_sync
       self.cleanup(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 839, in cleanup
       raise AirflowException(
   airflow.exceptions.AirflowException: Pod 
k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip returned a failure.
   remote_pod: {'api_version': 'v1',
    'kind': 'Pod',
    'metadata': {'annotations': None,
   ...
               'start_time': datetime.datetime(2024, 2, 21, 0, 8, 44, 
tzinfo=tzlocal())}}
   [2024-02-21, 00:08:52 UTC] {taskinstance.py:1138} INFO - Marking task as 
FAILED. dag_id=example_dynamic_task_mapping_with_k8s, task_id=sum_it_k8s, 
execution_date=20240221T000700, start_date=20240221T000844, 
end_date=20240221T000852
   [2024-02-21, 00:08:52 UTC] {standard_task_runner.py:107} ERROR - Failed to 
execute job 52 for task sum_it_k8s (Pod 
k8s-***-pod-d1de35ce457d4d1e8091af5666f92f06-soeab5ip returned a failure.
   remote_pod: {'api_version': 'v1',
    'kind': 'Pod',
    'metadata': {'annotations': None,
                 'creation_timestamp': datetime.datetime(2024, 2, 21, 0, 8, 44, 
tzinfo=tzlocal()),
   ...
               'start_time': datetime.datetime(2024, 2, 21, 0, 8, 44, 
tzinfo=tzlocal())}}; 3062)
   [2024-02-21, 00:08:52 UTC] {local_task_job_runner.py:234} INFO - Task exited 
with return code 1
   [2024-02-21, 00:08:52 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   </details>
   <details>
   <summary>sum_it_k8s_airflow</summary>
   
   ```log
   airflow-worker-0.airflow-worker.default.svc.cluster.local
   *** Found logs served from host 
http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_k8s_airflow/attempt=1.log
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow 
scheduled__2024-02-21T00:07:00+00:00 [queued]>
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow 
scheduled__2024-02-21T00:07:00+00:00 [queued]>
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 
of 1
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing 
<Task(_KubernetesDecoratedOperator): sum_it_k8s_airflow> on 2024-02-21 
00:07:00+00:00
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started 
process 3044 to run task
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: 
['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 
'sum_it_k8s_***', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', '49', 
'--raw', '--subdir', 'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', 
'--cfg-path', '/tmp/tmp7k0l0mmp']
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 49: 
Subtask sum_it_k8s_***
   [2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running 
<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow 
scheduled__2024-02-21T00:07:00+00:00 [running]> on host 
***-worker-0.***-worker.default.svc.cluster.local
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='***' 
AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' 
AIRFLOW_CTX_TASK_ID='sum_it_k8s_***' 
AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00'
   [2024-02-21, 00:08:44 UTC] {warnings.py:109} WARNING - 
/home/***/.local/lib/python3.11/site-packages/***/providers/cncf/kubernetes/operators/pod.py:1036:
 AirflowProviderDeprecationWarning: This function is deprecated. Please use 
`add_unique_suffix`.
     pod.metadata.name = add_pod_suffix(pod_name=pod.metadata.name)
   [2024-02-21, 00:08:44 UTC] {pod.py:1055} INFO - Building pod 
k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t with labels: {'dag_id': 
'example_dynamic_task_mapping_with_k8s', 'task_id': 'sum_it_k8s_***', 'run_id': 
'scheduled__2024-02-21T0007000000-7c62a84f4', 'kubernetes_pod_operator': 
'True', 'try_number': '1'}
   [2024-02-21, 00:08:44 UTC] {pod.py:526} INFO - Found matching pod 
k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t with labels 
{'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.8.1', 'dag_id': 
'example_dynamic_task_mapping_with_k8s', 'kubernetes_pod_operator': 'True', 
'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'task_id': 
'sum_it_k8s_***', 'try_number': '1'}
   [2024-02-21, 00:08:44 UTC] {pod.py:527} INFO - `try_number` of 
task_instance: 1
   [2024-02-21, 00:08:44 UTC] {pod.py:528} INFO - `try_number` of pod: 1
   [2024-02-21, 00:08:44 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t
   [2024-02-21, 00:08:45 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t
   [2024-02-21, 00:08:46 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t
   [2024-02-21, 00:08:47 UTC] {pod_manager.py:466} INFO - [base] + python -c 
'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = 
open("/tmp/script.py", "wb"); f.write(x); f.close()'
   [2024-02-21, 00:08:47 UTC] {pod_manager.py:466} INFO - [base] + python -c 
'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = 
open("/tmp/script.in", "wb"); f.write(x); f.close()'
   [2024-02-21, 00:08:47 UTC] {pod_manager.py:466} INFO - [base] + mkdir -p 
/***/xcom
   2024-02-21T00:08:49.598549087Z 
   2024-02-21T00:08:49.598558379Z 
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] + python 
/tmp/script.py /tmp/script.in /***/xcom/return.json
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] Traceback 
(most recent call last):
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 
1910, in _execute_context
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     
self.dialect.do_execute(
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", 
line 736, in do_execute
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     
cursor.execute(statement, parameters)
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] 
sqlite3.OperationalError: no such table: xcom
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] The above 
exception was the direct cause of the following exception:
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] Traceback 
(most recent call last):
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/tmp/script.py", line 24, in <module>
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     res = 
sum_it_k8s_***(*arg_dict["args"], **arg_dict["kwargs"])
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]           
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/tmp/script.py", line 21, in sum_it_k8s_***
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     total = 
sum(values)
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]             
^^^^^^^^^^^
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/home/***/.local/lib/python3.11/site-packages/***/models/xcom.py", line 721, 
in __next__
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     return 
XCom.deserialize_value(next(self._it))
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]                
                   ^^^^^^^^^^^^^^
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/home/***/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 
2901, in __iter__
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     result = 
self._iter()
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]              
^^^^^^^^^^^^
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/home/***/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 
2916, in _iter
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     result = 
self.session.execute(
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]              
^^^^^^^^^^^^^^^^^^^^^
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/home/***/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 
1717, in execute
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     result = 
conn._execute_20(statement, params or {}, execution_options)
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 
1710, in _execute_20
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     return 
meth(self, args_10style, kwargs_10style, execution_options)
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]            
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/home/***/.local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     return 
connection._execute_clauseelement(
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]            
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 
1577, in _execute_clauseelement
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     ret = 
self._execute_context(
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]           
^^^^^^^^^^^^^^^^^^^^^^
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 
1953, in _execute_context
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     
self._handle_dbapi_exception(
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 
2134, in _handle_dbapi_exception
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     
util.raise_(
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/home/***/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 
211, in raise_
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     raise 
exception
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 
1910, in _execute_context
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     
self.dialect.do_execute(
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]   File 
"/home/***/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", 
line 736, in do_execute
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base]     
cursor.execute(statement, parameters)
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] 
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: xcom
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] [SQL: SELECT 
xcom.value
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] FROM xcom JOIN 
dag_run ON xcom.dag_run_id = dag_run.id
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:466} INFO - [base] WHERE xcom.key 
= 'return_value' AND xcom.task_id = 'add_one_python' AND xcom.dag_id = 
'example_dynamic_task_mapping_with_k8s' AND xcom.run_id = 
'scheduled__2024-02-21T00:07:00+00:00' ORDER BY xcom.map_index ASC]
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:483} INFO - [base] (Background on 
this error at: https://sqlalche.me/e/14/e3q8)
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:718} INFO - Checking if xcom 
sidecar container is started.
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:721} INFO - The xcom sidecar 
container is started.
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:798} INFO - Running command... if 
[ -s /***/xcom/return.json ]; then cat /***/xcom/return.json; else echo 
__***_xcom_result_empty__; fi
   [2024-02-21, 00:08:50 UTC] {pod_manager.py:798} INFO - Running command... 
kill -s SIGINT 1
   [2024-02-21, 00:08:51 UTC] {pod.py:559} INFO - xcom result file is empty.
   [2024-02-21, 00:08:51 UTC] {pod_manager.py:616} INFO - Pod 
k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t has phase Running
   [2024-02-21, 00:08:53 UTC] {pod_manager.py:616} INFO - Pod 
k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t has phase Running
   [2024-02-21, 00:08:55 UTC] {pod.py:909} INFO - Deleting pod: 
k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t
   [2024-02-21, 00:08:55 UTC] {taskinstance.py:2698} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 433, in _execute_task
       result = execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes.py",
 line 128, in execute
       return super().execute(context)
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", 
line 241, in execute
       return_value = super().execute(context)
                      ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 570, in execute
       return self.execute_sync(context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 629, in execute_sync
       self.cleanup(
     File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 839, in cleanup
       raise AirflowException(
   airflow.exceptions.AirflowException: Pod 
k8s-***-pod-5c04d41f692a4ae7ac7f979d252aa9d3-ewqd7l4t returned a failure.
   remote_pod: {'api_version': 'v1',
    'kind': 'Pod',
    'metadata': {'annotations': None,
                 'creation_timestamp': datetime.datetime(2024, 2, 21, 0, 8, 44, 
tzinfo=tzlocal()),
    ...
               'start_time': datetime.datetime(2024, 2, 21, 0, 8, 44, 
tzinfo=tzlocal())}}; 3044)
   [2024-02-21, 00:08:55 UTC] {local_task_job_runner.py:234} INFO - Task exited 
with return code 1
   [2024-02-21, 00:08:55 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   </details>
   <details>
   <summary>sum_it_k8s_airflow_config</summary>
   
   ```log
   airflow-worker-0.airflow-worker.default.svc.cluster.local
   *** Found logs served from host 
http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_k8s_airflow_config/attempt=1.log
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow_config 
scheduled__2024-02-21T00:07:00+00:00 [queued]>
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow_config 
scheduled__2024-02-21T00:07:00+00:00 [queued]>
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 
of 1
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing 
<Task(_KubernetesDecoratedOperator): sum_it_k8s_airflow_config> on 2024-02-21 
00:07:00+00:00
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started 
process 3049 to run task
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: 
['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 
'sum_it_k8s_***_config', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', 
'51', '--raw', '--subdir', 
'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', 
'/tmp/tmp04503lkh']
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 51: 
Subtask sum_it_k8s_***_config
   [2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running 
<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_airflow_config 
scheduled__2024-02-21T00:07:00+00:00 [running]> on host 
***-worker-0.***-worker.default.svc.cluster.local
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='***' 
AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' 
AIRFLOW_CTX_TASK_ID='sum_it_k8s_***_config' 
AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00'
   [2024-02-21, 00:08:44 UTC] {warnings.py:109} WARNING - 
/home/***/.local/lib/python3.11/site-packages/***/providers/cncf/kubernetes/operators/pod.py:1036:
 AirflowProviderDeprecationWarning: This function is deprecated. Please use 
`add_unique_suffix`.
     pod.metadata.name = add_pod_suffix(pod_name=pod.metadata.name)
   [2024-02-21, 00:08:44 UTC] {pod.py:1055} INFO - Building pod 
k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k with labels: {'dag_id': 
'example_dynamic_task_mapping_with_k8s', 'task_id': 'sum_it_k8s_***_config', 
'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 
'kubernetes_pod_operator': 'True', 'try_number': '1'}
   [2024-02-21, 00:08:44 UTC] {pod.py:526} INFO - Found matching pod 
k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k with labels 
{'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.8.1', 'dag_id': 
'example_dynamic_task_mapping_with_k8s', 'kubernetes_pod_operator': 'True', 
'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'task_id': 
'sum_it_k8s_***_config', 'try_number': '1'}
   [2024-02-21, 00:08:44 UTC] {pod.py:527} INFO - `try_number` of 
task_instance: 1
   [2024-02-21, 00:08:44 UTC] {pod.py:528} INFO - `try_number` of pod: 1
   [2024-02-21, 00:08:44 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k
   [2024-02-21, 00:08:45 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k
   [2024-02-21, 00:08:46 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k
   [2024-02-21, 00:08:47 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k
   [2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python -c 
'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = 
open("/tmp/script.py", "wb"); f.write(x); f.close()'
   [2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python -c 
'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = 
open("/tmp/script.in", "wb"); f.write(x); f.close()'
   [2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + mkdir -p 
/***/xcom
   [2024-02-21, 00:08:51 UTC] {pod_manager.py:466} INFO - [base] + python 
/tmp/script.py /tmp/script.in /***/xcom/return.json
   [2024-02-21, 00:08:52 UTC] {pod_manager.py:483} INFO - [base] Total was 9
   [2024-02-21, 00:08:52 UTC] {pod_manager.py:511} WARNING - Pod 
k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k log read interrupted but 
container base still running
   [2024-02-21, 00:08:53 UTC] {pod_manager.py:483} INFO - [base] Total was 9
   [2024-02-21, 00:08:53 UTC] {pod_manager.py:718} INFO - Checking if xcom 
sidecar container is started.
   [2024-02-21, 00:08:53 UTC] {pod_manager.py:721} INFO - The xcom sidecar 
container is started.
   [2024-02-21, 00:08:53 UTC] {pod_manager.py:798} INFO - Running command... if 
[ -s /***/xcom/return.json ]; then cat /***/xcom/return.json; else echo 
__***_xcom_result_empty__; fi
   [2024-02-21, 00:08:53 UTC] {pod_manager.py:798} INFO - Running command... 
kill -s SIGINT 1
   [2024-02-21, 00:08:53 UTC] {pod.py:559} INFO - xcom result file is empty.
   [2024-02-21, 00:08:53 UTC] {pod_manager.py:616} INFO - Pod 
k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k has phase Running
   [2024-02-21, 00:08:55 UTC] {pod.py:909} INFO - Deleting pod: 
k8s-***-pod-3cf2803e8d4d496c85e6ca2b1ea5a790-e9zzie8k
   [2024-02-21, 00:08:55 UTC] {taskinstance.py:1138} INFO - Marking task as 
SUCCESS. dag_id=example_dynamic_task_mapping_with_k8s, 
task_id=sum_it_k8s_***_config, execution_date=20240221T000700, 
start_date=20240221T000844, end_date=20240221T000855
   [2024-02-21, 00:08:55 UTC] {local_task_job_runner.py:234} INFO - Task exited 
with return code 0
   [2024-02-21, 00:08:55 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   </details>
   <details>
   <summary>evaluate_lazy_xcoms_python</summary>
   
   ```log
   airflow-worker-0.airflow-worker.default.svc.cluster.local
   *** Found logs served from host 
http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=evaluate_lazy_xcoms_python/attempt=1.log
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
example_dynamic_task_mapping_with_k8s.evaluate_lazy_xcoms_python 
scheduled__2024-02-21T00:07:00+00:00 [queued]>
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:1956} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
example_dynamic_task_mapping_with_k8s.evaluate_lazy_xcoms_python 
scheduled__2024-02-21T00:07:00+00:00 [queued]>
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 
of 1
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2191} INFO - Executing 
<Task(_PythonDecoratedOperator): evaluate_lazy_xcoms_python> on 2024-02-21 
00:07:00+00:00
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:60} INFO - Started 
process 3048 to run task
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:87} INFO - Running: 
['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 
'evaluate_lazy_xcoms_python', 'scheduled__2024-02-21T00:07:00+00:00', 
'--job-id', '50', '--raw', '--subdir', 
'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', 
'/tmp/tmps0lkgd19']
   [2024-02-21, 00:08:44 UTC] {standard_task_runner.py:88} INFO - Job 50: 
Subtask evaluate_lazy_xcoms_python
   [2024-02-21, 00:08:44 UTC] {task_command.py:423} INFO - Running 
<TaskInstance: example_dynamic_task_mapping_with_k8s.evaluate_lazy_xcoms_python 
scheduled__2024-02-21T00:07:00+00:00 [running]> on host 
***-worker-0.***-worker.default.svc.cluster.local
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:2480} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='***' 
AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' 
AIRFLOW_CTX_TASK_ID='evaluate_lazy_xcoms_python' 
AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00'
   [2024-02-21, 00:08:44 UTC] {python.py:201} INFO - Done. Returned value was: 
[2, 3, 4]
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:1138} INFO - Marking task as 
SUCCESS. dag_id=example_dynamic_task_mapping_with_k8s, 
task_id=evaluate_lazy_xcoms_python, execution_date=20240221T000700, 
start_date=20240221T000844, end_date=20240221T000844
   [2024-02-21, 00:08:44 UTC] {local_task_job_runner.py:234} INFO - Task exited 
with return code 0
   [2024-02-21, 00:08:44 UTC] {taskinstance.py:3280} INFO - 1 downstream tasks 
scheduled from follow-on schedule check
   ```
   </details>
   <details>
   <summary>sum_it_k8s_lazyevaluated</summary>
   
   ```log
   airflow-worker-0.airflow-worker.default.svc.cluster.local
   *** Found logs served from host 
http://airflow-worker-0.airflow-worker.default.svc.cluster.local:8793/log/dag_id=example_dynamic_task_mapping_with_k8s/run_id=scheduled__2024-02-21T00:07:00+00:00/task_id=sum_it_k8s_lazyevaluated/attempt=1.log
   [2024-02-21, 00:08:45 UTC] {taskinstance.py:1956} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
example_dynamic_task_mapping_with_k8s.sum_it_k8s_lazyevaluated 
scheduled__2024-02-21T00:07:00+00:00 [queued]>
   [2024-02-21, 00:08:45 UTC] {taskinstance.py:1956} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
example_dynamic_task_mapping_with_k8s.sum_it_k8s_lazyevaluated 
scheduled__2024-02-21T00:07:00+00:00 [queued]>
   [2024-02-21, 00:08:45 UTC] {taskinstance.py:2170} INFO - Starting attempt 1 
of 1
   [2024-02-21, 00:08:45 UTC] {taskinstance.py:2191} INFO - Executing 
<Task(_KubernetesDecoratedOperator): sum_it_k8s_lazyevaluated> on 2024-02-21 
00:07:00+00:00
   [2024-02-21, 00:08:45 UTC] {standard_task_runner.py:60} INFO - Started 
process 3099 to run task
   [2024-02-21, 00:08:45 UTC] {standard_task_runner.py:87} INFO - Running: 
['***', 'tasks', 'run', 'example_dynamic_task_mapping_with_k8s', 
'sum_it_k8s_lazyevaluated', 'scheduled__2024-02-21T00:07:00+00:00', '--job-id', 
'54', '--raw', '--subdir', 
'DAGS_FOLDER/example-dag-k8s-dynamic-task-mapping.py', '--cfg-path', 
'/tmp/tmph4g0tr_h']
   [2024-02-21, 00:08:45 UTC] {standard_task_runner.py:88} INFO - Job 54: 
Subtask sum_it_k8s_lazyevaluated
   [2024-02-21, 00:08:45 UTC] {task_command.py:423} INFO - Running 
<TaskInstance: example_dynamic_task_mapping_with_k8s.sum_it_k8s_lazyevaluated 
scheduled__2024-02-21T00:07:00+00:00 [running]> on host 
***-worker-0.***-worker.default.svc.cluster.local
   [2024-02-21, 00:08:45 UTC] {taskinstance.py:2480} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='***' 
AIRFLOW_CTX_DAG_ID='example_dynamic_task_mapping_with_k8s' 
AIRFLOW_CTX_TASK_ID='sum_it_k8s_lazyevaluated' 
AIRFLOW_CTX_EXECUTION_DATE='2024-02-21T00:07:00+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-21T00:07:00+00:00'
   [2024-02-21, 00:08:45 UTC] {warnings.py:109} WARNING - 
/home/***/.local/lib/python3.11/site-packages/***/providers/cncf/kubernetes/operators/pod.py:1036:
 AirflowProviderDeprecationWarning: This function is deprecated. Please use 
`add_unique_suffix`.
     pod.metadata.name = add_pod_suffix(pod_name=pod.metadata.name)
   [2024-02-21, 00:08:45 UTC] {pod.py:1055} INFO - Building pod 
k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4 with labels: {'dag_id': 
'example_dynamic_task_mapping_with_k8s', 'task_id': 'sum_it_k8s_lazyevaluated', 
'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 
'kubernetes_pod_operator': 'True', 'try_number': '1'}
   [2024-02-21, 00:08:45 UTC] {pod.py:526} INFO - Found matching pod 
k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4 with labels 
{'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.8.1', 'dag_id': 
'example_dynamic_task_mapping_with_k8s', 'kubernetes_pod_operator': 'True', 
'run_id': 'scheduled__2024-02-21T0007000000-7c62a84f4', 'task_id': 
'sum_it_k8s_lazyevaluated', 'try_number': '1'}
   [2024-02-21, 00:08:45 UTC] {pod.py:527} INFO - `try_number` of 
task_instance: 1
   [2024-02-21, 00:08:45 UTC] {pod.py:528} INFO - `try_number` of pod: 1
   [2024-02-21, 00:08:45 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4
   [2024-02-21, 00:08:46 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4
   [2024-02-21, 00:08:47 UTC] {pod_manager.py:373} WARNING - Pod not yet 
started: k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4
   [2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python -c 
'import base64, os;x = base64.b64decode(os.environ["__PYTHON_SCRIPT"]);f = 
open("/tmp/script.py", "wb"); f.write(x); f.close()'
   [2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python -c 
'import base64, os;x = base64.b64decode(os.environ["__PYTHON_INPUT"]);f = 
open("/tmp/script.in", "wb"); f.write(x); f.close()'
   [2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + mkdir -p 
/***/xcom
   [2024-02-21, 00:08:48 UTC] {pod_manager.py:466} INFO - [base] + python 
/tmp/script.py /tmp/script.in /***/xcom/return.json
   [2024-02-21, 00:08:48 UTC] {pod_manager.py:483} INFO - [base] Total was 9
   [2024-02-21, 00:08:48 UTC] {pod_manager.py:718} INFO - Checking if xcom 
sidecar container is started.
   [2024-02-21, 00:08:48 UTC] {pod_manager.py:721} INFO - The xcom sidecar 
container is started.
   [2024-02-21, 00:08:48 UTC] {pod_manager.py:798} INFO - Running command... if 
[ -s /***/xcom/return.json ]; then cat /***/xcom/return.json; else echo 
__***_xcom_result_empty__; fi
   [2024-02-21, 00:08:48 UTC] {pod_manager.py:798} INFO - Running command... 
kill -s SIGINT 1
   [2024-02-21, 00:08:49 UTC] {pod.py:559} INFO - xcom result file is empty.
   [2024-02-21, 00:08:49 UTC] {pod_manager.py:616} INFO - Pod 
k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4 has phase Running
   [2024-02-21, 00:08:52 UTC] {pod.py:909} INFO - Deleting pod: 
k8s-***-pod-1ee5ebb7c4804192aecb46008bb6c0cf-8pch0mt4
   [2024-02-21, 00:08:52 UTC] {taskinstance.py:1138} INFO - Marking task as 
SUCCESS. dag_id=example_dynamic_task_mapping_with_k8s, 
task_id=sum_it_k8s_lazyevaluated, execution_date=20240221T000700, 
start_date=20240221T000845, end_date=20240221T000852
   [2024-02-21, 00:08:52 UTC] {local_task_job_runner.py:234} INFO - Task exited 
with return code 0
   [2024-02-21, 00:08:52 UTC] {taskinstance.py:3280} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   </details>
   </details>
   
   ### Operating System
   
   macOS 13.4 (22F66)
   
   ### Versions of Apache Airflow Providers
   
   `apache-airflow-providers-cncf-kubernetes` 8.0.0
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   <details>
   <summary>Host specifications</summary>
   
   - MacBook Pro with Apple M2 Pro
     - macOS 13.4 (22F66)
     - Darwin 22.5.0
   - Local Kubernetes 1.27.7 
     - [k3d](https://k3d.io/) v5.6.0 using image `rancher/k3s:v1.27.7-k3s1`
     - Docker Desktop with Rosetta for x86/amd64 emulation on Apple Silicon
   </details>
   
   <details>
   <summary>Airflow specifications</summary>
   
   - Airflow 2.8.1 with Python 3.11
     - Based on the `apache/airflow:2.8.1-python3.11` image
   - Deployed using [airflow-helm](https://github.com/airflow-helm/charts) 8.8.0
     - `helm version` = version.BuildInfo{Version:"v3.12.3", 
GitCommit:"3a31588ad33fe3b89af5a2a54ee1d25bfe6eaa5e", GitTreeState:"clean", 
GoVersion:"go1.20.7"}
   - `apache-airflow-providers-cncf-kubernetes` 8.0.0
   </details>
   
   <details>
   <summary>Dockerfile</summary>
   
   ### Dockerfile
   ```dockerfile
   FROM apache/airflow:2.8.1-python3.11
   
   COPY requirements.txt requirements.txt
   RUN pip install --upgrade pip
   RUN pip install -r requirements.txt
   
   ```
   ### requirements.txt
   ```txt
   apache-airflow-providers-cncf-kubernetes==8.0.0
   
   ```
   </details>
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to