JonnyWaffles opened a new issue, #30029:
URL: https://github.com/apache/airflow/issues/30029
### Apache Airflow version
2.5.1
### What happened
Executor: Celery
Airflow V 2.5.1
Platform: Python 3.9
OS: ubuntu20.04
Hi team, this one is pretty wild.
In order to integrate with our CICD, we run Pytest as a Task inside a Dag
like so
```python
@task(multiple_outputs=True)
def run_pytest(params=None) -> Dict[str, Union[int, str]]:
with tempfile.NamedTemporaryFile() as f:
pytest_args = (params or {}).get("pytest_args") or Variable.get(
"pytest_args"
).split()
args = [*pytest_args, "--ignore", __file__, f"--junitxml={f.name}"]
code = pytest.main(args)
xml_text = Path(f.name).read_text()
try:
ET.fromstring(xml_text)
except ET.ParseError as err:
xml_text = f"{JUNIT_XML_EXCEPTION_STRING} " f"Original
exception: {err}"
return {PYTEST_RET_CODE_XCOM_KEY: code, JUNIT_XML_XCOM_KEY: xml_text}
```
This has been working great! And I was excited to use the new `dag.test`
command to run our test dags inside the pipeline. Unfortunately, if you
`dag.test` any Dag with dynamically mapped tasks, the logging system triggers a
recursive exception.
You can easily recreate this behavior by creating the following test case
```python
from airflow.example_dags.example_dynamic_task_mapping import dag
def test_map_dag():
dag.test()
```
And then executing it as a task using the above `run_pytest`. You'll see a
stack trace like
```
File
"/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py",
line 149, in write
2023-03-10T20:18:54.316081052Z self.flush()
2023-03-10T20:18:54.316083250Z File
"/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py",
line 156, in flush
2023-03-10T20:18:54.316085497Z self._propagate_log(buf)
2023-03-10T20:18:54.316088048Z File
"/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py",
line 137, in _propagate_log
2023-03-10T20:18:54.316090370Z self.logger.log(self.level,
remove_escape_codes(message))
2023-03-10T20:18:54.316092740Z File
"/usr/lib/python3.9/logging/__init__.py", line 1512, in log
2023-03-10T20:18:54.316095067Z self._log(level, msg, args, **kwargs)
2023-03-10T20:18:54.316097285Z File
"/usr/lib/python3.9/logging/__init__.py", line 1589, in _log
2023-03-10T20:18:54.316102712Z self.handle(record)
2023-03-10T20:18:54.316105214Z File
"/usr/lib/python3.9/logging/__init__.py", line 1599, in handle
2023-03-10T20:18:54.316107414Z self.callHandlers(record)
2023-03-10T20:18:54.316109533Z File
"/usr/lib/python3.9/logging/__init__.py", line 1661, in callHandlers
2023-03-10T20:18:54.316111769Z hdlr.handle(record)
2023-03-10T20:18:54.316113875Z File
"/usr/lib/python3.9/logging/__init__.py", line 952, in handle
2023-03-10T20:18:54.316116060Z self.emit(record)
2023-03-10T20:18:54.316118210Z File
"/usr/lib/python3.9/logging/__init__.py", line 1091, in emit
2023-03-10T20:18:54.316120464Z self.handleError(record)
2023-03-10T20:18:54.316122649Z File
"/usr/lib/python3.9/logging/__init__.py", line 1004, in handleError
2023-03-10T20:18:54.316124890Z sys.stderr.write('--- Logging error
---\n')
2023-03-10T20:18:54.316127043Z File
"/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py",
line 149, in write
2023-03-10T20:18:54.316129277Z self.flush()
2023-03-10T20:18:54.316131450Z File
"/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py",
line 156, in flush
2023-03-10T20:18:54.316133804Z self._propagate_log(buf)
2023-03-10T20:18:54.316135984Z File
"/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py",
line 137, in _propagate_log
```
etc etc. You can see the cycle between `flush` and `propagate_log`
### What you think should happen instead
It would be nice to be able to run a Dag.test command inside a
PythonOperator Task so we can easily run pytest as a remotely triggered Dag.
### How to reproduce
1. Create a test case
```python
from airflow.example_dags.example_dynamic_task_mapping import dag
def test_map_dag():
dag.test()
```
2. Run the test case via Pytest inside a PythonOperator Task (using
CeleryExecutor)
### Operating System
ubuntu20.04
### Versions of Apache Airflow Providers
You don't need any providers to reproduce
### Deployment
Docker-Compose
### Deployment details
Running scheduler, worker, webserver, and Redis, are inside a local docker
compose. Single worker container with default Celery settings.
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]