internetcoffeephone opened a new issue, #41676:
URL: https://github.com/apache/airflow/issues/41676

   ### Apache Airflow Provider(s)
   
   openlineage
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-openlineage==1.10.0
   
   ### Apache Airflow version
   
   2.9.3
   
   ### Operating System
   
   NAME="Amazon Linux" VERSION="2" ID="amzn"
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   After upgrading the openlineage provider from 1.7.1 to 1.10.0 we have been 
encountering the following error:
   
   ```
   [2024-08-22, 08:53:52 CEST] {{taskinstance.py:2906}} ERROR - Task failed 
with exception
   Traceback (most recent call last):
     File 
"/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/models/taskinstance.py",
 line 466, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
     File 
"/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/models/taskinstance.py",
 line 433, in _execute_callable
       return execute_callable(context=context, **execute_callable_kwargs)
     File 
"/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/models/baseoperator.py",
 line 401, in wrapper
       return func(self, *args, **kwargs)
     File "/usr/local/airflow/repo/elt/dataflows/operators/check_operator.py", 
line 43, in execute
       self.set_previous_task_instance_success(context)
     File "/usr/local/airflow/repo/elt/dataflows/operators/check_operator.py", 
line 70, in set_previous_task_instance_success
       previous_dag_run.update_state()
     File 
"/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/utils/session.py",
 line 79, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/models/dagrun.py",
 line 821, in update_state
       self.notify_dagrun_state_changed(msg="task_failure")
     File 
"/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/models/dagrun.py",
 line 984, in notify_dagrun_state_changed
       get_listener_manager().hook.on_dag_run_failed(dag_run=self, msg=msg)
     File 
"/home/airflow/airflow_env/lib/python3.10/site-packages/pluggy/_hooks.py", line 
513, in __call__
       return self._hookexec(self.name, self._hookimpls.copy(), kwargs, 
firstresult)
     File 
"/home/airflow/airflow_env/lib/python3.10/site-packages/pluggy/_manager.py", 
line 120, in _hookexec
       return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
     File 
"/home/airflow/airflow_env/lib/python3.10/site-packages/pluggy/_callers.py", 
line 139, in _multicall
       raise exception.with_traceback(exception.__traceback__)
     File 
"/home/airflow/airflow_env/lib/python3.10/site-packages/pluggy/_callers.py", 
line 103, in _multicall
       res = hook_impl.function(*args)
     File 
"/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/providers/openlineage/plugins/listener.py",
 line 467, in on_dag_run_failed
       self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)
     File "/lib/python3.10/concurrent/futures/process.py", line 738, in submit
       self._start_executor_manager_thread()
     File "/lib/python3.10/concurrent/futures/process.py", line 678, in 
_start_executor_manager_thread
       self._launch_processes()
     File "/lib/python3.10/concurrent/futures/process.py", line 705, in 
_launch_processes
       self._spawn_process()
     File "/lib/python3.10/concurrent/futures/process.py", line 714, in 
_spawn_process
       p.start()
     File "/lib/python3.10/multiprocessing/process.py", line 118, in start
       assert not _current_process._config.get('daemon'), \
   AssertionError: daemonic processes are not allowed to have children
   ```
   
   We have a check operator that runs a check that may fail. On the next run of 
that check, it marks the check in the previous DAGRun as success so as to not 
clutter the Airflow UI, then runs `update_state()` on the previous DAGRun. This 
is done so that the previous DAGRun itself is marked as success, iff all of its 
tasks are in a success/skipped state.
   
   ### What you think should happen instead
   
   dag_run.update_state() should be callable from within an Operator without 
error with the OpenLineage provider enabled.
   
   ### How to reproduce
   
   ```
   import logging
   
   from airflow.models import BaseOperator, DagRun
   from airflow.utils.state import TaskInstanceState
   
   
   class CheckOperator(BaseOperator):
   
       def execute(self, context):
           log = logging.getLogger(__name__)
           log.info("Attempting to set previous task instance state to SUCCESS")
   
           dag_runs = DagRun.find(dag_id=self.dag_id)
           dag_runs.sort(key=lambda x: x.execution_date)
           dag_run_ids = [dag_run.run_id for dag_run in dag_runs]
   
           previous_dag_run_index = 
dag_run_ids.index(context["dag_run"].run_id) - 1
           previous_dag_run = dag_runs[previous_dag_run_index]
   
           previous_task_instance = 
previous_dag_run.get_task_instance(self.task_id)
           if previous_task_instance:
               state_changed = 
previous_task_instance.set_state(TaskInstanceState.SUCCESS)
               if state_changed:
                   # Set the entire DagRun to SUCCESS if all TIs are now set to 
SUCCESS/SKIPPED
                   previous_dag_run.dag = self.dag
                   previous_dag_run.update_state()
               else:
                   log.info("Previous task instance state was already SUCCESS")
           else:
               log.info("No previous task instances to mark as SUCCESS")
   ```
   
   
   1. Set the AIRFLOW__OPENLINEAGE__TRANSPORT variable
   2. Add the above operator to a DAG
   3. Trigger a manual DAG run
   4. Mark it as failed, including the tasks
   5. Trigger another manual DAG run
   6. The error should now be shown
   
   ### Anything else
   
   I strongly suspect this issue is caused by 
https://github.com/apache/airflow/pull/39235 which was [included in 
1.8.0](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/changelog.html#id8),
 as the issue started for us after upgrading from 1.7.1 to 1.10.0.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to