davidharkis commented on issue #47283:
URL: https://github.com/apache/airflow/issues/47283#issuecomment-2763238308

   > [@davidharkis](https://github.com/davidharkis) your issue might be related 
(or itentical to) [#47780](https://github.com/apache/airflow/issues/47780)
   
   Thanks for pointing me to that @RNHTTR. That issue is something I can 
reproduce quite easily, but isn't the cause of thi.
   
   However! Trying out the Job operator has resulted in a big revelation. After 
running one of the failing DAGs using it instead (but overloaded to fix 
[#47780](https://github.com/apache/airflow/issues/47780), I'm still getting a 
404 occasionally (again, high load, production environment only)
   
   ```
   core-identity-cluid-stats-daily-dbt-cluid-stats-v6-pub-1d210ed3
    ▶ Log message source details
   [2025-03-29, 04:10:39 UTC] {local_task_job_runner.py:123} ▶ Pre task 
execution logs
   [2025-03-29, 04:10:40 UTC] {base.py:84} INFO - Retrieving connection 
'kubernetes_aws'
   [2025-03-29, 04:10:40 UTC] {pod.py:1148} INFO - Building pod 
dbt-cluid-stats-v6-public-u8l4yigh with labels: {'dag_id': 
'core-identity-cluid-stats-daily', 'task_id': 'dbt-cluid-stats-v6-public', 
'run_id': 'dataset_triggered__2025-03-29T040933.7980870000-bd82c0449', 
'kubernetes_pod_operator': 'True', 'map_index': '37', 'try_number': '1'}
   [2025-03-29, 04:10:41 UTC] {job.py:349} INFO - Building job 
job-dbt-cluid-stats-v6-public-eu1tnba3 
   [2025-03-29, 04:10:42 UTC] {pod.py:545} INFO - Found matching pod 
job-dbt-cluid-stats-v6-public-eu1tnba3-s2nth with labels 
{'airflow_kpo_in_cluster': 'False', 'airflow_version': '2.10.5', 
'batch.kubernetes.io/controller-uid': '5d9fe31d-1c02-49d9-b9d7-cf014f1dd72f', 
'batch.kubernetes.io/job-name': 'job-dbt-cluid-stats-v6-public-eu1tnba3', 
'controller-uid': '5d9fe31d-1c02-49d9-b9d7-cf014f1dd72f', 'dag_id': 
'core-identity-cluid-stats-daily', 'job-name': 
'job-dbt-cluid-stats-v6-public-eu1tnba3', 'kubernetes_pod_operator': 'True', 
'map_index': '37', 'run_id': 
'dataset_triggered__2025-03-29T040933.7980870000-bd82c0449', 'task_id': 
'dbt-cluid-stats-v6-public', 'try_number': '1'}
   [2025-03-29, 04:10:42 UTC] {pod.py:546} INFO - `try_number` of 
task_instance: 1
   [2025-03-29, 04:10:42 UTC] {pod.py:547} INFO - `try_number` of pod: 1
   [2025-03-29, 04:10:42 UTC] {taskinstance.py:289} INFO - Pausing task as 
DEFERRED. dag_id=core-identity-cluid-stats-daily, 
task_id=dbt-cluid-stats-v6-public, 
run_id=dataset_triggered__2025-03-29T04:09:33.798087+00:00, 
execution_date=20250329T040933, start_date=20250329T041039
   [2025-03-29, 04:10:42 UTC] {taskinstance.py:341} ▶ Post task execution logs
   [2025-03-29, 04:10:43 UTC] {kubernetes.py:899} INFO - Requesting status for 
the job 'job-dbt-cluid-stats-v6-public-eu1tnba3' 
   [2025-03-29, 04:14:37 UTC] {local_task_job_runner.py:123} ▶ Pre task 
execution logs
   [2025-03-29, 04:14:37 UTC] {baseoperator.py:1810} ERROR - Trigger failed:
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 558, in cleanup_finished_triggers
       result = details["task"].result()
                ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 630, in run_trigger
       async for event in trigger.run():
     File "/opt/code/id5/kubernetes/trigger/job.py", line 115, in run
       job: V1Job = await self.hook.wait_until_job_complete(name=self.job_name, 
namespace=self.job_namespace)
                    
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 900, in wait_until_job_complete
       job: V1Job = await self.get_job_status(name=name, namespace=namespace)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 883, in get_job_status
       job: V1Job = await v1_api.read_namespaced_job_status(
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes_asyncio/client/api_client.py",
 line 192, in __call_api
       raise e
     File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes_asyncio/client/api_client.py",
 line 185, in __call_api
       response_data = await self.request(
                       ^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes_asyncio/client/rest.py",
 line 195, in GET
       return (await self.request("GET", url,
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes_asyncio/client/rest.py",
 line 189, in request
       raise ApiException(http_resp=r)
   kubernetes_asyncio.client.exceptions.ApiException: (404)
   Reason: Not Found
   HTTP response headers: <CIMultiDictProxy('Audit-Id': 
'367329f8-fc4b-4d3b-9ae8-e71b54b54238', 'Cache-Control': 'no-cache, private', 
'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 
'83f2afc0-22a9-4eef-adaa-d9af4d5febf6', 'X-Kubernetes-Pf-Prioritylevel-Uid': 
'd0830089-6ed9-4917-90ac-dc395de94ce8', 'Date': 'Sat, 29 Mar 2025 04:10:43 
GMT', 'Content-Length': '266')>
   HTTP response body: 
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch
 \"job-dbt-cluid-stats-v6-public-eu1tnba3\" not 
found","reason":"NotFound","details":{"name":"job-dbt-cluid-stats-v6-public-eu1tnba3","group":"batch","kind":"jobs"},"code":404}
   [2025-03-29, 04:14:37 UTC] {taskinstance.py:3313} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 768, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 734, in _execute_callable
       return ExecutionCallableRunner(
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py",
 line 252, in run
       return self.func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
 line 1811, in resume_execution
       raise TaskDeferralError(next_kwargs.get("error", "Unknown"))
   airflow.exceptions.TaskDeferralError: Trigger failure
   [2025-03-29, 04:14:37 UTC] {taskinstance.py:1226} INFO - Marking task as 
UP_FOR_RETRY. dag_id=core-identity-cluid-stats-daily, 
task_id=dbt-cluid-stats-v6-public, 
run_id=dataset_triggered__2025-03-29T04:09:33.798087+00:00, map_index=37, 
execution_date=20250329T040933, start_date=20250329T041039, 
end_date=20250329T041437
   [2025-03-29, 04:14:37 UTC] {taskinstance.py:341} ▶ Post task execution logs
   ```
   
   This shouldn't be possible because the job & pod wasn't deleted for another 
~10 minutes!
   
   ```
   
@timestamp,@logStream,verb,requestURI,userAgent,user.username,objectRef.name,objectRef.namespace,responseStatus.code
   2025-03-29 
04:24:07.146,kube-apiserver-audit-80d946be7f5ecefd8b8db06a133c54a7,delete,/apis/batch/v1/namespaces/airflow-tasks-prod/jobs/job-dbt-cluid-stats-v6-public-eu1tnba3,kube-controller-manager/v1.31.6
 (linux/arm64) 
kubernetes/7555883/system:serviceaccount:kube-system:ttl-after-finished-controller,system:serviceaccount:kube-system:ttl-after-finished-controller,job-dbt-cluid-stats-v6-public-eu1tnba3,airflow-tasks-prod,200
   ```
   
   So, where did the 404 come from?? The wrong K8s cluster!
   
   Our environment is as follows;
   
   - Nomad: Runs Airflow (Triggerer, Webserver)
   - On-Prem K8s: Runs Airflow Kubernetes Executor Workers & Some Pod/Job 
operator tasks
   - EKS: Runs Pod/Job operator tasks for the larger DAGs such as the one we're 
seeing here
   
   
   Note, in no case do we have in_cluster=true
   
   It was the on-prem k8s cluster, where the worker was running that reported 
the 404, even though the tasks were running in the EKS cluster.
   
   ```
   1743221443553        
{"file":"/var/log/audit/kube/kube-apiserver.log","host":"controlplane-1","message":"{\"kind\":\"Event\",\"apiVersion\":\"audit.k8s.io/v1\",\"level\":\"Metadata\",\"auditID\":\"367329f8-fc4b-4d3b-9ae8-e71b54b54238\",\"stage\":\"ResponseComplete\",\"requestURI\":\"/apis/batch/v1/namespaces/airflow-tasks-prod/jobs/job-dbt-cluid-stats-v6-public-eu1tnba3/status\",\"verb\":\"get\",\"user\":{\"username\":\"admin\",\"groups\":[\"system:masters\",\"system:authenticated\"]},\"sourceIPs\":[\"10.0.1.31\"],\"userAgent\":\"OpenAPI-Generator/30.1.0/python\",\"objectRef\":{\"resource\":\"jobs\",\"namespace\":\"airflow-tasks-prod\",\"name\":\"job-dbt-cluid-stats-v6-public-eu1tnba3\",\"apiGroup\":\"batch\",\"apiVersion\":\"v1\",\"subresource\":\"status\"},\"responseStatus\":{\"metadata\":{},\"status\":\"Failure\",\"message\":\"jobs.batch
 \\\"job-dbt-cluid-stats-v6-public-eu1tnba3\\\" not 
found\",\"reason\":\"NotFound\",\"details\":{\"name\":\"job-dbt-cluid-stats-v6-public-eu1tnba3\",\
 
"group\":\"batch\",\"kind\":\"jobs\"},\"code\":404},\"requestReceivedTimestamp\":\"2025-03-29T04:10:43.349813Z\",\"stageTimestamp\":\"2025-03-29T04:10:43.350679Z\",\"annotations\":{\"authorization.k8s.io/decision\":\"allow\",\"authorization.k8s.io/reason\":\"\"}}","source_type":"file"}
   1743221443553        
{"file":"/var/log/audit/kube/kube-apiserver.log","host":"controlplane-1","message":"{\"kind\":\"Event\",\"apiVersion\":\"audit.k8s.io/v1\",\"level\":\"Metadata\",\"auditID\":\"367329f8-fc4b-4d3b-9ae8-e71b54b54238\",\"stage\":\"RequestReceived\",\"requestURI\":\"/apis/batch/v1/namespaces/airflow-tasks-prod/jobs/job-dbt-cluid-stats-v6-public-eu1tnba3/status\",\"verb\":\"get\",\"user\":{\"username\":\"admin\",\"groups\":[\"system:masters\",\"system:authenticated\"]},\"sourceIPs\":[\"10.0.1.31\"],\"userAgent\":\"OpenAPI-Generator/30.1.0/python\",\"objectRef\":{\"resource\":\"jobs\",\"namespace\":\"airflow-tasks-prod\",\"name\":\"job-dbt-cluid-stats-v6-public-eu1tnba3\",\"apiGroup\":\"batch\",\"apiVersion\":\"v1\",\"subresource\":\"status\"},\"requestReceivedTimestamp\":\"2025-03-29T04:10:43.349813Z\",\"stageTimestamp\":\"2025-03-29T04:10:43.349813Z\"}","source_type":"file"}
   ```
   
   This should obviously not be happening. We've got two separate Airflow 
connections, and the majority of tasks, even within high load periods work 
correctly, but somehwere incorrect connections are being re-used.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to