vincbeck opened a new issue, #40247:
URL: https://github.com/apache/airflow/issues/40247
### Apache Airflow version
2.9.2
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
As part of #40205, an option `use_executor` has been added to `dag.test` to
run the DAG using an executor. This works well when used with non local
executors such as `AwsEcsExecutor`. However, when used with local executors
such as `LocalExecutor` or `CeleryExecutor`, `dag.test` exits with errors.
```
[2024-06-14T15:44:21.306+0000] {base_executor.py:149} INFO - Adding to
queue: ['airflow', 'tasks', 'run', 'example_sns', 'variable_fetcher',
'manual__2024-06-14T15:43:59.429752+00:00', '--force', '--local', '--pool',
'default_pool', '--subdir',
'/opt/airflow/tests/system/providers/amazon/aws/example_sns.py']
[2024-06-14T15:44:21.308+0000] {local_executor.py:90} INFO -
QueuedLocalWorker running ['airflow', 'tasks', 'run', 'example_sns',
'variable_fetcher', 'manual__2024-06-14T15:43:59.429752+00:00', '--force',
'--local', '--pool', 'default_pool', '--subdir',
'/opt/airflow/tests/system/providers/amazon/aws/example_sns.py']
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/opt/airflow/airflow/__main__.py", line 58, in main
args.func(args)
File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
return f(*args, **kwargs)
File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line
55, in wrapped_function
return func(*args, **kwargs)
File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/airflow/airflow/cli/commands/dag_command.py", line 611, in
dag_test
dr: DagRun = dag.test(
File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/models/dag.py", line 2984, in test
triggerer_running = _triggerer_is_healthy()
File "/opt/airflow/airflow/models/dag.py", line 301, in
_triggerer_is_healthy
job = TriggererJobRunner.most_recent_job()
File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/airflow/airflow/jobs/base_job_runner.py", line 71, in
most_recent_job
return most_recent_job(cls.job_type, session=session)
File "/opt/airflow/airflow/api_internal/internal_api_call.py", line 127,
in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/jobs/job.py", line 384, in most_recent_job
return session.scalar(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
line 1753, in scalar
).scalar()
File
"/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line
1276, in scalar
return self._only_one_row(
File
"/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line
559, in _only_one_row
row = onerow(hard_close=True)
File
"/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line
1801, in _fetchone_impl
row = next(self.iterator, _NO_ROW)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/loading.py",
line 147, in chunks
fetch = cursor._raw_all_rows()
File
"/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line
393, in _raw_all_rows
return [make_row(row) for row in rows]
File
"/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line
393, in <listcomp>
return [make_row(row) for row in rows]
RuntimeError: number of values in row (1) differ from number of column
processors (10)
[2024-06-14T15:44:21.334+0000] {local_executor.py:192} INFO - Failed to read
tasks from the task queue because the other end has closed the connection.
Terminating worker QueuedLocalWorker-24.
```
After investigation, it seems the issue is the executor shuts the DB
connection while executing the task which then make `dag.test` fail because it
needs a DB connection
([example](https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L2957)).
That would explain why cloud based executors work, because `dag.test` and the
task execution does not happen on the same machine.
Investigation note: as mentioned in this
[comment](https://github.com/apache/airflow/pull/40205#issuecomment-2168424139),
commenting out
https://github.com/apache/airflow/blob/main/airflow/cli/commands/task_command.py#L470
keep it failing for the same reason but one task further. The first task
succeeds, and then the second one fails for the same reason.
### What you think should happen instead?
When `dag.test` is used with `use_executor` (`dag.test(use_executor=True)`)
and the executor configured in the Airflow environment is either
`LocalExecutor` or `CeleryExecutor`, it should execute successfully the DAG
using the executor.
### How to reproduce
Run `dag.test(use_executor=True)` with `LocalExecutor` or `CeleryExecutor`
as executor.
### Operating System
MacOS
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]