tirkarthi commented on issue #47873:
URL: https://github.com/apache/airflow/issues/47873#issuecomment-2734114349
1. Run `example_xcom` example dag with a manual logical date.
2. Delete all the dagruns associated with the dag `delete from dag_run where
dag_id="example_xcom";`
3. Run the `example_xcom` example dag with the same manual logical date.
4. This results in the second run trying to set an xcom resulting in
integrity error and 409 response from the API server.
5. The scheduler crashes with below stacktrace.
This might not be the ideal way but the issue is that any type of such
validation error shouldn't crash the scheduler and once the scheduler enters
this state restarting again tries the same API call which will result in
validation error and crashes the scheduler.
```
2025-03-18 22:48:38 [debug ] Received message from task runner
[supervisor] msg=SetRenderedFields(rendered_fields={'templates_dict': None,
'op_args': '()', 'op_kwargs': {}}, type='SetRenderedFields')
[2025-03-18T22:48:38.260+0530] {_client.py:1025} INFO - HTTP Request: PUT
http://localhost:8000/execution/task-instances/0195aa43-00f9-788f-aeb0-02eddfbdbd9d/rtif
"HTTP/1.1 201 Created"
2025-03-18 22:48:38 [debug ] Workload process exited [supervisor]
exit_code=0
2025-03-18 22:48:38 [debug ] Received message from task runner
[supervisor] msg=SetXCom(key='value from pusher 1', value=[1, 2, 3],
dag_id='example_xcom', run_id='manual__2025-03-18T17:17:52.732068+00:00',
task_id='push', map_index=-1, mapped_length=None, type='SetXCom')
[2025-03-18T22:48:38.275+0530] {_client.py:1025} INFO - HTTP Request: POST
http://localhost:8000/execution/xcoms/example_xcom/manual__2025-03-18T17:17:52.732068+00:00/push/value%20from%20pusher%201
"HTTP/1.1 409 Conflict"
2025-03-18 22:48:38 [warning ] Server error
[airflow.sdk.api.client] detail={'detail': {'reason': 'Unique constraint
violation', 'statement': 'INSERT INTO xcom (dag_run_id, task_id, map_index,
"key", dag_id, run_id, value, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?)',
'orig_error': 'UNIQUE constraint failed: xcom.dag_run_id, xcom.task_id,
xcom.map_index, xcom.key'}}
[2025-03-18T22:48:38.277+0530] {local_executor.py:96} ERROR - uhoh
Traceback (most recent call last):
File
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py",
line 92, in _run_worker
_execute_work(log, workload)
File
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py",
line 113, in _execute_work
supervise(
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 1105, in supervise
exit_code = process.wait()
^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 715, in wait
self._monitor_subprocess()
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 773, in _monitor_subprocess
alive = self._service_subprocess(max_wait_time=max_wait_time) is None
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 610, in _service_subprocess
need_more = socket_handler(key.fileobj)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 979, in cb
gen.send(line)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 499, in handle_requests
self._handle_request(msg, log)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 902, in _handle_request
self.client.xcoms.set(
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/api/client.py",
line 330, in set
self.client.post(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}",
params=params, json=value)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/httpx/_client.py",
line 1144, in post
return self.request(
^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
line 336, in wrapped_f
return copy(f, *args, **kw)
^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
line 475, in __call__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
line 376, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
line 398, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in
__get_result
raise self._exception
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
line 478, in __call__
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/api/client.py",
line 486, in request
return super().request(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/httpx/_client.py",
line 825, in request
return self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/httpx/_client.py",
line 914, in send
response = self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/httpx/_client.py",
line 942, in _send_handling_auth
response = self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/httpx/_client.py",
line 999, in _send_handling_redirects
raise exc
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/httpx/_client.py",
line 982, in _send_handling_redirects
hook(response)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/api/client.py",
line 111, in raise_on_4xx_5xx_with_note
return get_json_error(response) or response.raise_for_status()
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/api/client.py",
line 101, in get_json_error
raise err
airflow.sdk.api.client.ServerResponseError: Server returned error
Correlation-id=0195aa43-b2b6-7a7c-88b0-4b75f2432925
[2025-03-18T22:48:39.216+0530] {scheduler_job_runner.py:941} ERROR -
Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py",
line 937, in _execute
self._run_scheduler_loop()
File
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py",
line 1071, in _run_scheduler_loop
executor.heartbeat()
File "/home/karthikeyan/stuff/python/airflow/airflow/traces/tracer.py",
line 54, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow/executors/base_executor.py",
line 254, in heartbeat
self.sync()
File
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py",
line 205, in sync
self._read_results()
File
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py",
line 210, in _read_results
key, state, exc = self.result_queue.get()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/multiprocessing/queues.py", line 367, in get
return _ForkingPickler.loads(res)
^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: ServerResponseError.__init__() missing 2 required keyword-only
arguments: 'request' and 'response'
[2025-03-18T22:48:39.218+0530] {local_executor.py:216} INFO - Shutting down
LocalExecutor; waiting for running tasks to finish. Signal again if you don't
want to wait.
[2025-03-18T22:48:39.240+0530] {scheduler_job_runner.py:948} ERROR -
Exception when executing Executor.end on LocalExecutor(parallelism=32)
Traceback (most recent call last):
File
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py",
line 937, in _execute
self._run_scheduler_loop()
File
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py",
line 1071, in _run_scheduler_loop
executor.heartbeat()
File "/home/karthikeyan/stuff/python/airflow/airflow/traces/tracer.py",
line 54, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow/executors/base_executor.py",
line 254, in heartbeat
self.sync()
File
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py",
line 205, in sync
self._read_results()
File
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py",
line 210, in _read_results
key, state, exc = self.result_queue.get()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/multiprocessing/queues.py", line 367, in get
return _ForkingPickler.loads(res)
^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: ServerResponseError.__init__() missing 2 required keyword-only
arguments: 'request' and 'response'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py",
line 946, in _execute
executor.end()
File
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py",
line 235, in end
self._read_results()
File
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py",
line 210, in _read_results
key, state, exc = self.result_queue.get()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/multiprocessing/queues.py", line 367, in get
return _ForkingPickler.loads(res)
^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: ServerResponseError.__init__() missing 2 required keyword-only
arguments: 'request' and 'response'
[2025-03-18T22:48:39.242+0530] {scheduler_job_runner.py:953} INFO - Exited
execute loop
Traceback (most recent call last):
File "/home/karthikeyan/stuff/python/airflow/.venv/bin/airflow", line 10,
in <module>
sys.exit(main())
^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/__main__.py", line
58, in main
args.func(args)
File "/home/karthikeyan/stuff/python/airflow/airflow/cli/cli_config.py",
line 49, in command
[2025-03-18 22:48:39 +0530] [13863] [INFO] Handling signal: term
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/utils/cli.py", line
111, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow/utils/providers_configuration_loader.py",
line 55, in wrapped_function
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py",
line 52, in scheduler
run_command_with_daemon_option(
File
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
callback()
File
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py",
line 55, in <lambda>
callback=lambda: _run_scheduler_job(args),
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py",
line 43, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py",
line 101, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line
342, in run_job
[2025-03-18 22:48:39 +0530] [13864] [INFO] Worker exiting (pid: 13864)
[2025-03-18 22:48:39 +0530] [13865] [INFO] Worker exiting (pid: 13865)
return execute_job(job, execute_callable=execute_callable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line
371, in execute_job
ret = execute_callable()
^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py",
line 937, in _execute
self._run_scheduler_loop()
File
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py",
line 1071, in _run_scheduler_loop
executor.heartbeat()
File "/home/karthikeyan/stuff/python/airflow/airflow/traces/tracer.py",
line 54, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow/executors/base_executor.py",
line 254, in heartbeat
self.sync()
File
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py",
line 205, in sync
self._read_results()
File
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py",
line 210, in _read_results
key, state, exc = self.result_queue.get()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/multiprocessing/queues.py", line 367, in get
return _ForkingPickler.loads(res)
^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: ServerResponseError.__init__() missing 2 required keyword-only
arguments: 'request' and 'response'
[2025-03-18 22:48:39 +0530] [13863] [INFO] Shutting down: Master
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]