harrisonharris-di opened a new issue, #31902:
URL: https://github.com/apache/airflow/issues/31902

   ### Apache Airflow version
   
   2.6.1
   
   ### What happened
   
   When doing `.partial()` and `.expand()` on any operator which has an 
instance property (e.g. `@property`) of `operator_extra_links` the 
`MappedOperator` does not handle it properly, causing the dag to fail to import.
   
   The `BatchOperator` from `airflow.providers.amazon.aws.operators.batch` is 
one example of an operator which defines `operator_extra_links` on a 
per-instance basis.
   
   ### What you think should happen instead
   
   The dag should not fail to import (especially when using the AWS 
`BatchOperator`!) Either:
   
   - If per-instance `operator_extra_links` is deemed disallowed behaviour
     - `MappedOperator` should detect it's a property and give a more helpful 
error message
     - `BatchOperator` from the AWS provider should be changed. If I need to 
open another ticket elsewhere for that please let me know
   - If per-instance `operator_extra_links` is allowed
     - `MappedOperator` needs to be adjusted to account for that
   
   ### How to reproduce
   
   ```
   import pendulum
   from airflow.models.baseoperator import BaseOperator
   from airflow.decorators import dag, task
   class BadOperator(BaseOperator):
       def __init__(self, *args, some_argument: str, **kwargs):
           super().__init__(*args, some_argument, **kwargs)
   
       @property
       def operator_extra_links(self):
           # !PROBLEMATIC FUNCTION!
           # ... Some code to create a collection of `BaseOperatorLink`s 
dynamically
           return tuple()
   
   @dag(
       schedule=None,
       start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
       catchup=False,
       tags=["example"]
   )
   def airflow_is_bad():
       """
       Example to demonstrate issue with airflow API
       """
   
       @task
       def create_arguments():
           return [1,2,3,4]
   
       bad_operator_test_group = BadOperator.partial(
           task_id="bad_operator_test_group",
       ).expand(some_argument=create_arguments())
   
   dag = airflow_is_bad()
   ```
   Put this in your dags folder, Airflow will fail to import the dag with error
   ```
   Broken DAG: [<USER>/airflow/dags/airflow_is_bad_minimal_example.py] 
Traceback (most recent call last):
     File 
"/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py",
 line 825, in _serialize_node
       serialize_op["_operator_extra_links"] = 
cls._serialize_operator_extra_links(
     File 
"/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py",
 line 1165, in _serialize_operator_extra_links
       for operator_extra_link in operator_extra_links:
   TypeError: 'property' object is not iterable
   ```
   Commenting out the `operator_extra_links` from the `BadOperator` in the 
example will allow the dag to be imported fine
   
   ### Operating System
   
   macOS Ventura 13.4
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow-providers-amazon==8.0.0
   apache-airflow-providers-common-sql==1.4.0
   apache-airflow-providers-ftp==3.3.1
   apache-airflow-providers-google==10.0.0
   apache-airflow-providers-http==4.3.0
   apache-airflow-providers-imap==3.1.1
   apache-airflow-providers-postgres==5.4.0
   apache-airflow-providers-sqlite==3.3.2
   ```
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   I found adjusting `operator_extra_links` on `BatchOperator` to be 
`operator_extra_links = (BatchJobDetailsLink(), BatchJobDefinitionLink(), 
BatchJobQueueLink(), CloudWatchEventsLink())` solved my issue and made it run 
fine, however I've no idea if that's safe or generalises because I'm not sure 
what `operator_extra_links` is actually for internally.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to