JonnyWaffles opened a new issue, #30029:
URL: https://github.com/apache/airflow/issues/30029

   ### Apache Airflow version
   
   2.5.1
   
   ### What happened
   
   Executor: Celery
   Airflow V 2.5.1
   Platform: Python 3.9
   OS: ubuntu20.04
   
   Hi team, this one is pretty wild.
   
   In order to integrate with our CICD, we run Pytest as a Task inside a Dag 
like so
   
   ```python
   @task(multiple_outputs=True)
   def run_pytest(params=None) -> Dict[str, Union[int, str]]:
       with tempfile.NamedTemporaryFile() as f:
           pytest_args = (params or {}).get("pytest_args") or Variable.get(
               "pytest_args"
           ).split()
           args = [*pytest_args, "--ignore", __file__, f"--junitxml={f.name}"]
           code = pytest.main(args)
           xml_text = Path(f.name).read_text()
           try:
               ET.fromstring(xml_text)
           except ET.ParseError as err:
               xml_text = f"{JUNIT_XML_EXCEPTION_STRING} " f"Original 
exception: {err}"
   
       return {PYTEST_RET_CODE_XCOM_KEY: code, JUNIT_XML_XCOM_KEY: xml_text}
   ```
   This has been working great! And I was excited to use the new `dag.test` 
command to run our test dags inside the pipeline. Unfortunately, if you 
`dag.test` any Dag with dynamically mapped tasks, the logging system triggers a 
recursive exception.
   
   You can easily recreate this behavior by creating the following test case
   ```python
   from airflow.example_dags.example_dynamic_task_mapping import dag
   
   def test_map_dag():
       dag.test()
   ```
   And then executing it as a  task using the above `run_pytest`. You'll see a 
stack trace like
   ```
    File 
"/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", 
line 149, in write
   2023-03-10T20:18:54.316081052Z     self.flush()
   2023-03-10T20:18:54.316083250Z   File 
"/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", 
line 156, in flush
   2023-03-10T20:18:54.316085497Z     self._propagate_log(buf)
   2023-03-10T20:18:54.316088048Z   File 
"/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", 
line 137, in _propagate_log
   2023-03-10T20:18:54.316090370Z     self.logger.log(self.level, 
remove_escape_codes(message))
   2023-03-10T20:18:54.316092740Z   File 
"/usr/lib/python3.9/logging/__init__.py", line 1512, in log
   2023-03-10T20:18:54.316095067Z     self._log(level, msg, args, **kwargs)
   2023-03-10T20:18:54.316097285Z   File 
"/usr/lib/python3.9/logging/__init__.py", line 1589, in _log
   2023-03-10T20:18:54.316102712Z     self.handle(record)
   2023-03-10T20:18:54.316105214Z   File 
"/usr/lib/python3.9/logging/__init__.py", line 1599, in handle
   2023-03-10T20:18:54.316107414Z     self.callHandlers(record)
   2023-03-10T20:18:54.316109533Z   File 
"/usr/lib/python3.9/logging/__init__.py", line 1661, in callHandlers
   2023-03-10T20:18:54.316111769Z     hdlr.handle(record)
   2023-03-10T20:18:54.316113875Z   File 
"/usr/lib/python3.9/logging/__init__.py", line 952, in handle
   2023-03-10T20:18:54.316116060Z     self.emit(record)
   2023-03-10T20:18:54.316118210Z   File 
"/usr/lib/python3.9/logging/__init__.py", line 1091, in emit
   2023-03-10T20:18:54.316120464Z     self.handleError(record)
   2023-03-10T20:18:54.316122649Z   File 
"/usr/lib/python3.9/logging/__init__.py", line 1004, in handleError
   2023-03-10T20:18:54.316124890Z     sys.stderr.write('--- Logging error 
---\n')
   2023-03-10T20:18:54.316127043Z   File 
"/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", 
line 149, in write
   2023-03-10T20:18:54.316129277Z     self.flush()
   2023-03-10T20:18:54.316131450Z   File 
"/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", 
line 156, in flush
   2023-03-10T20:18:54.316133804Z     self._propagate_log(buf)
   2023-03-10T20:18:54.316135984Z   File 
"/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", 
line 137, in _propagate_log
   ```
   etc etc. You can see the cycle between `flush` and `propagate_log`
   
   
   ### What you think should happen instead
   
   It would be nice to be able to run a Dag.test command inside a 
PythonOperator Task so we can easily run pytest as a remotely triggered Dag.
   
   ### How to reproduce
   
   1. Create a test case
       ```python
       from airflow.example_dags.example_dynamic_task_mapping import dag
   
       def test_map_dag():
           dag.test()
       ```
   2. Run the test case via Pytest inside a PythonOperator Task (using 
CeleryExecutor) 
   
   ### Operating System
   
   ubuntu20.04
   
   ### Versions of Apache Airflow Providers
   
   You don't need any providers to reproduce
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Running scheduler, worker, webserver, and Redis, are inside a local docker 
compose. Single worker container with default Celery settings.
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to