paul-laffon-dd opened a new issue, #44984:
URL: https://github.com/apache/airflow/issues/44984

   ### Apache Airflow Provider(s)
   
   openlineage
   
   ### Versions of Apache Airflow Providers
   
   1.14.0
   
   Also seeing missing start DAG events for versions <= 1.12.0. However, those 
versions weren't logging the exception, making it difficult to determine if 
this is the same issue.
   
   ### Apache Airflow version
   
   2.10.1
   
   ### Operating System
   
   Amazon Linux
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   MWAA with:
   - requirements.txt with `apache-airflow-providers-openlineage==1.14.0`
   - startup.sh with `OPENLINEAGE_URL` pointing to a webserver logging all 
received requests
   
   ### What happened
   
   OpenLineage provider failed to send some DAG start events, with the 
following exception in the scheduler logs:
   
   ```
   [2024-12-17T00:44:00.564+0000] {listener.py:528} WARNING - Failed to submit 
method to executor
   concurrent.futures.process._RemoteTraceback:
   """
   Traceback (most recent call last):
     File "/usr/local/lib/python3.11/multiprocessing/queues.py", line 244, in 
_feed
       obj = _ForkingPickler.dumps(obj)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/multiprocessing/reduction.py", line 51, in 
dumps
       cls(buf, protocol).dump(obj)
   _pickle.PicklingError: Can't pickle <functools._lru_cache_wrapper object at 
0x7fb8f1c02980>: 
   it's not the same object as 
airflow.models.abstractoperator.AbstractOperator.get_parse_time_mapped_ti_count
   """
   ```
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   The failures to send events were non-deterministic and appear to be caused 
by a race condition. They seem to occur more frequently when multiple DAGs are 
being scheduled simultaneously.
   
    I used this code to reproduce the issue, and it failed to send at least one 
DAG start almost every minute.
   
   ```python
   for i in range(4):
       with DAG(
           f'frequent_dag_{i}',
           schedule_interval=timedelta(minutes=1),
           start_date=days_ago(1),
           catchup=False,
       ) as dag:
           def task():
               print("Task is running")
   
           task = PythonOperator(
               task_id=f'print_task_{i}',
               python_callable=task,
               dag=dag,
           )
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to