tirkarthi commented on issue #47873:
URL: https://github.com/apache/airflow/issues/47873#issuecomment-2734114349

   1. Run `example_xcom` example dag with a manual logical date.
   2. Delete all the dagruns associated with the dag `delete from dag_run where 
dag_id="example_xcom";`
   3. Run the `example_xcom` example dag with the same manual logical date.
   4. This results in the second run trying to set an xcom resulting in 
integrity error and 409 response from the API server.
   5. The scheduler crashes with below stacktrace.
   
   
   This might not be the ideal way but the issue is that any type of such 
validation error shouldn't crash the scheduler and once the scheduler enters 
this state restarting again tries the same API call which will result in 
validation error and crashes the scheduler.
   
   ```
   2025-03-18 22:48:38 [debug    ] Received message from task runner 
[supervisor] msg=SetRenderedFields(rendered_fields={'templates_dict': None, 
'op_args': '()', 'op_kwargs': {}}, type='SetRenderedFields')
   [2025-03-18T22:48:38.260+0530] {_client.py:1025} INFO - HTTP Request: PUT 
http://localhost:8000/execution/task-instances/0195aa43-00f9-788f-aeb0-02eddfbdbd9d/rtif
 "HTTP/1.1 201 Created"
   2025-03-18 22:48:38 [debug    ] Workload process exited        [supervisor] 
exit_code=0
   2025-03-18 22:48:38 [debug    ] Received message from task runner 
[supervisor] msg=SetXCom(key='value from pusher 1', value=[1, 2, 3], 
dag_id='example_xcom', run_id='manual__2025-03-18T17:17:52.732068+00:00', 
task_id='push', map_index=-1, mapped_length=None, type='SetXCom')
   [2025-03-18T22:48:38.275+0530] {_client.py:1025} INFO - HTTP Request: POST 
http://localhost:8000/execution/xcoms/example_xcom/manual__2025-03-18T17:17:52.732068+00:00/push/value%20from%20pusher%201
 "HTTP/1.1 409 Conflict"
   2025-03-18 22:48:38 [warning  ] Server error                   
[airflow.sdk.api.client] detail={'detail': {'reason': 'Unique constraint 
violation', 'statement': 'INSERT INTO xcom (dag_run_id, task_id, map_index, 
"key", dag_id, run_id, value, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?)', 
'orig_error': 'UNIQUE constraint failed: xcom.dag_run_id, xcom.task_id, 
xcom.map_index, xcom.key'}}
   [2025-03-18T22:48:38.277+0530] {local_executor.py:96} ERROR - uhoh
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py", 
line 92, in _run_worker
       _execute_work(log, workload)
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py", 
line 113, in _execute_work
       supervise(
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 1105, in supervise
       exit_code = process.wait()
                   ^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 715, in wait
       self._monitor_subprocess()
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 773, in _monitor_subprocess
       alive = self._service_subprocess(max_wait_time=max_wait_time) is None
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 610, in _service_subprocess
       need_more = socket_handler(key.fileobj)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 979, in cb
       gen.send(line)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 499, in handle_requests
       self._handle_request(msg, log)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 902, in _handle_request
       self.client.xcoms.set(
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/api/client.py",
 line 330, in set
       self.client.post(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}", 
params=params, json=value)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/httpx/_client.py",
 line 1144, in post
       return self.request(
              ^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 336, in wrapped_f
       return copy(f, *args, **kw)
              ^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 475, in __call__
       do = self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 376, in iter
       result = action(retry_state)
                ^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 398, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
                                        ^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/concurrent/futures/_base.py", line 449, in result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 478, in __call__
       result = fn(*args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/api/client.py",
 line 486, in request
       return super().request(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/httpx/_client.py",
 line 825, in request
       return self.send(request, auth=auth, follow_redirects=follow_redirects)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/httpx/_client.py",
 line 914, in send
       response = self._send_handling_auth(
                  ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/httpx/_client.py",
 line 942, in _send_handling_auth
       response = self._send_handling_redirects(
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/httpx/_client.py",
 line 999, in _send_handling_redirects
       raise exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/httpx/_client.py",
 line 982, in _send_handling_redirects
       hook(response)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/api/client.py",
 line 111, in raise_on_4xx_5xx_with_note
       return get_json_error(response) or response.raise_for_status()
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/api/client.py",
 line 101, in get_json_error
       raise err
   airflow.sdk.api.client.ServerResponseError: Server returned error
   Correlation-id=0195aa43-b2b6-7a7c-88b0-4b75f2432925
   [2025-03-18T22:48:39.216+0530] {scheduler_job_runner.py:941} ERROR - 
Exception when executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 937, in _execute
       self._run_scheduler_loop()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 1071, in _run_scheduler_loop
       executor.heartbeat()
     File "/home/karthikeyan/stuff/python/airflow/airflow/traces/tracer.py", 
line 54, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/executors/base_executor.py", 
line 254, in heartbeat
       self.sync()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py", 
line 205, in sync
       self._read_results()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py", 
line 210, in _read_results
       key, state, exc = self.result_queue.get()
                         ^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/multiprocessing/queues.py", line 367, in get
       return _ForkingPickler.loads(res)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
   TypeError: ServerResponseError.__init__() missing 2 required keyword-only 
arguments: 'request' and 'response'
   [2025-03-18T22:48:39.218+0530] {local_executor.py:216} INFO - Shutting down 
LocalExecutor; waiting for running tasks to finish.  Signal again if you don't 
want to wait.
   [2025-03-18T22:48:39.240+0530] {scheduler_job_runner.py:948} ERROR - 
Exception when executing Executor.end on LocalExecutor(parallelism=32)
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 937, in _execute
       self._run_scheduler_loop()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 1071, in _run_scheduler_loop
       executor.heartbeat()
     File "/home/karthikeyan/stuff/python/airflow/airflow/traces/tracer.py", 
line 54, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/executors/base_executor.py", 
line 254, in heartbeat
       self.sync()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py", 
line 205, in sync
       self._read_results()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py", 
line 210, in _read_results
       key, state, exc = self.result_queue.get()
                         ^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/multiprocessing/queues.py", line 367, in get
       return _ForkingPickler.loads(res)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
   TypeError: ServerResponseError.__init__() missing 2 required keyword-only 
arguments: 'request' and 'response'
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 946, in _execute
       executor.end()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py", 
line 235, in end
       self._read_results()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py", 
line 210, in _read_results
       key, state, exc = self.result_queue.get()
                         ^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/multiprocessing/queues.py", line 367, in get
       return _ForkingPickler.loads(res)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
   TypeError: ServerResponseError.__init__() missing 2 required keyword-only 
arguments: 'request' and 'response'
   [2025-03-18T22:48:39.242+0530] {scheduler_job_runner.py:953} INFO - Exited 
execute loop
   Traceback (most recent call last):
     File "/home/karthikeyan/stuff/python/airflow/.venv/bin/airflow", line 10, 
in <module>
       sys.exit(main())
                ^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/__main__.py", line 
58, in main
       args.func(args)
     File "/home/karthikeyan/stuff/python/airflow/airflow/cli/cli_config.py", 
line 49, in command
   [2025-03-18 22:48:39 +0530] [13863] [INFO] Handling signal: term
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/cli.py", line 
111, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/utils/providers_configuration_loader.py",
 line 55, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py",
 line 52, in scheduler
       run_command_with_daemon_option(
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py",
 line 55, in <lambda>
       callback=lambda: _run_scheduler_job(args),
                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py",
 line 43, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", 
line 101, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 
342, in run_job
   [2025-03-18 22:48:39 +0530] [13864] [INFO] Worker exiting (pid: 13864)
   [2025-03-18 22:48:39 +0530] [13865] [INFO] Worker exiting (pid: 13865)
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 
371, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 937, in _execute
       self._run_scheduler_loop()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 1071, in _run_scheduler_loop
       executor.heartbeat()
     File "/home/karthikeyan/stuff/python/airflow/airflow/traces/tracer.py", 
line 54, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/executors/base_executor.py", 
line 254, in heartbeat
       self.sync()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py", 
line 205, in sync
       self._read_results()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py", 
line 210, in _read_results
       key, state, exc = self.result_queue.get()
                         ^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/multiprocessing/queues.py", line 367, in get
       return _ForkingPickler.loads(res)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
   TypeError: ServerResponseError.__init__() missing 2 required keyword-only 
arguments: 'request' and 'response'
   [2025-03-18 22:48:39 +0530] [13863] [INFO] Shutting down: Master
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to