vincbeck opened a new issue, #40247:
URL: https://github.com/apache/airflow/issues/40247

   ### Apache Airflow version
   
   2.9.2
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   As part of #40205, an option `use_executor` has been added to `dag.test` to 
run the DAG using an executor. This works well when used with non local 
executors such as `AwsEcsExecutor`. However, when used with local executors 
such as `LocalExecutor` or `CeleryExecutor`, `dag.test` exits with errors.
   
   ```
   [2024-06-14T15:44:21.306+0000] {base_executor.py:149} INFO - Adding to 
queue: ['airflow', 'tasks', 'run', 'example_sns', 'variable_fetcher', 
'manual__2024-06-14T15:43:59.429752+00:00', '--force', '--local', '--pool', 
'default_pool', '--subdir', 
'/opt/airflow/tests/system/providers/amazon/aws/example_sns.py']
   [2024-06-14T15:44:21.308+0000] {local_executor.py:90} INFO - 
QueuedLocalWorker running ['airflow', 'tasks', 'run', 'example_sns', 
'variable_fetcher', 'manual__2024-06-14T15:43:59.429752+00:00', '--force', 
'--local', '--pool', 'default_pool', '--subdir', 
'/opt/airflow/tests/system/providers/amazon/aws/example_sns.py']
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/opt/airflow/airflow/__main__.py", line 58, in main
       args.func(args)
     File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
       return f(*args, **kwargs)
     File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line 
55, in wrapped_function
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/airflow/airflow/cli/commands/dag_command.py", line 611, in 
dag_test
       dr: DagRun = dag.test(
     File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/models/dag.py", line 2984, in test
       triggerer_running = _triggerer_is_healthy()
     File "/opt/airflow/airflow/models/dag.py", line 301, in 
_triggerer_is_healthy
       job = TriggererJobRunner.most_recent_job()
     File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/airflow/airflow/jobs/base_job_runner.py", line 71, in 
most_recent_job
       return most_recent_job(cls.job_type, session=session)
     File "/opt/airflow/airflow/api_internal/internal_api_call.py", line 127, 
in wrapper
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/jobs/job.py", line 384, in most_recent_job
       return session.scalar(
     File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", 
line 1753, in scalar
       ).scalar()
     File 
"/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 
1276, in scalar
       return self._only_one_row(
     File 
"/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 
559, in _only_one_row
       row = onerow(hard_close=True)
     File 
"/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 
1801, in _fetchone_impl
       row = next(self.iterator, _NO_ROW)
     File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/loading.py", 
line 147, in chunks
       fetch = cursor._raw_all_rows()
     File 
"/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 
393, in _raw_all_rows
       return [make_row(row) for row in rows]
     File 
"/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 
393, in <listcomp>
       return [make_row(row) for row in rows]
   RuntimeError: number of values in row (1) differ from number of column 
processors (10)
   [2024-06-14T15:44:21.334+0000] {local_executor.py:192} INFO - Failed to read 
tasks from the task queue because the other end has closed the connection. 
Terminating worker QueuedLocalWorker-24.
   ```
   
   After investigation, it seems the issue is the executor shuts the DB 
connection while executing the task which then make `dag.test` fail because it 
needs a DB connection 
([example](https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L2957)).
 That would explain why cloud based executors work, because `dag.test` and the 
task execution does not happen on the same machine.
   
   Investigation note: as mentioned in this 
[comment](https://github.com/apache/airflow/pull/40205#issuecomment-2168424139),
 commenting out 
https://github.com/apache/airflow/blob/main/airflow/cli/commands/task_command.py#L470
 keep it failing for the same reason but one task further. The first task 
succeeds, and then the second one fails for the same reason.
   
   ### What you think should happen instead?
   
   When `dag.test` is used with `use_executor` (`dag.test(use_executor=True)`) 
and the executor configured in the Airflow environment is either 
`LocalExecutor` or `CeleryExecutor`, it should execute successfully the DAG 
using the executor.
   
   ### How to reproduce
   
   Run `dag.test(use_executor=True)` with `LocalExecutor` or `CeleryExecutor` 
as executor.
   
   ### Operating System
   
   MacOS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to