internetcoffeephone opened a new issue, #41676:
URL: https://github.com/apache/airflow/issues/41676
### Apache Airflow Provider(s)
openlineage
### Versions of Apache Airflow Providers
apache-airflow-providers-openlineage==1.10.0
### Apache Airflow version
2.9.3
### Operating System
NAME="Amazon Linux" VERSION="2" ID="amzn"
### Deployment
Other Docker-based deployment
### Deployment details
_No response_
### What happened
After upgrading the openlineage provider from 1.7.1 to 1.10.0 we have been
encountering the following error:
```
[2024-08-22, 08:53:52 CEST] {{taskinstance.py:2906}} ERROR - Task failed
with exception
Traceback (most recent call last):
File
"/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/models/taskinstance.py",
line 466, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File
"/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/models/taskinstance.py",
line 433, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
File
"/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/models/baseoperator.py",
line 401, in wrapper
return func(self, *args, **kwargs)
File "/usr/local/airflow/repo/elt/dataflows/operators/check_operator.py",
line 43, in execute
self.set_previous_task_instance_success(context)
File "/usr/local/airflow/repo/elt/dataflows/operators/check_operator.py",
line 70, in set_previous_task_instance_success
previous_dag_run.update_state()
File
"/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/utils/session.py",
line 79, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/models/dagrun.py",
line 821, in update_state
self.notify_dagrun_state_changed(msg="task_failure")
File
"/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/models/dagrun.py",
line 984, in notify_dagrun_state_changed
get_listener_manager().hook.on_dag_run_failed(dag_run=self, msg=msg)
File
"/home/airflow/airflow_env/lib/python3.10/site-packages/pluggy/_hooks.py", line
513, in __call__
return self._hookexec(self.name, self._hookimpls.copy(), kwargs,
firstresult)
File
"/home/airflow/airflow_env/lib/python3.10/site-packages/pluggy/_manager.py",
line 120, in _hookexec
return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
File
"/home/airflow/airflow_env/lib/python3.10/site-packages/pluggy/_callers.py",
line 139, in _multicall
raise exception.with_traceback(exception.__traceback__)
File
"/home/airflow/airflow_env/lib/python3.10/site-packages/pluggy/_callers.py",
line 103, in _multicall
res = hook_impl.function(*args)
File
"/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/providers/openlineage/plugins/listener.py",
line 467, in on_dag_run_failed
self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)
File "/lib/python3.10/concurrent/futures/process.py", line 738, in submit
self._start_executor_manager_thread()
File "/lib/python3.10/concurrent/futures/process.py", line 678, in
_start_executor_manager_thread
self._launch_processes()
File "/lib/python3.10/concurrent/futures/process.py", line 705, in
_launch_processes
self._spawn_process()
File "/lib/python3.10/concurrent/futures/process.py", line 714, in
_spawn_process
p.start()
File "/lib/python3.10/multiprocessing/process.py", line 118, in start
assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
```
We have a check operator that runs a check that may fail. On the next run of
that check, it marks the check in the previous DAGRun as success so as to not
clutter the Airflow UI, then runs `update_state()` on the previous DAGRun. This
is done so that the previous DAGRun itself is marked as success, iff all of its
tasks are in a success/skipped state.
### What you think should happen instead
dag_run.update_state() should be callable from within an Operator without
error with the OpenLineage provider enabled.
### How to reproduce
```
import logging
from airflow.models import BaseOperator, DagRun
from airflow.utils.state import TaskInstanceState
class CheckOperator(BaseOperator):
def execute(self, context):
log = logging.getLogger(__name__)
log.info("Attempting to set previous task instance state to SUCCESS")
dag_runs = DagRun.find(dag_id=self.dag_id)
dag_runs.sort(key=lambda x: x.execution_date)
dag_run_ids = [dag_run.run_id for dag_run in dag_runs]
previous_dag_run_index =
dag_run_ids.index(context["dag_run"].run_id) - 1
previous_dag_run = dag_runs[previous_dag_run_index]
previous_task_instance =
previous_dag_run.get_task_instance(self.task_id)
if previous_task_instance:
state_changed =
previous_task_instance.set_state(TaskInstanceState.SUCCESS)
if state_changed:
# Set the entire DagRun to SUCCESS if all TIs are now set to
SUCCESS/SKIPPED
previous_dag_run.dag = self.dag
previous_dag_run.update_state()
else:
log.info("Previous task instance state was already SUCCESS")
else:
log.info("No previous task instances to mark as SUCCESS")
```
1. Set the AIRFLOW__OPENLINEAGE__TRANSPORT variable
2. Add the above operator to a DAG
3. Trigger a manual DAG run
4. Mark it as failed, including the tasks
5. Trigger another manual DAG run
6. The error should now be shown
### Anything else
I strongly suspect this issue is caused by
https://github.com/apache/airflow/pull/39235 which was [included in
1.8.0](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/changelog.html#id8),
as the issue started for us after upgrading from 1.7.1 to 1.10.0.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]