Taragolis opened a new pull request, #25248:
URL: https://github.com/apache/airflow/pull/25248

   At that moment Airflow can't serialize DAG with dynamic tasks if 
`operator_extra_links` set as property 
   related: #25243, #25215, #24676
   
   I've tried to add get actual values of property.
   
   ---
   It works in simple cases: #25215, #24676
   
   ```python
   from pendulum import datetime
   
   from airflow.decorators import dag
   from airflow.sensors.external_task import ExternalTaskSensor
   
   
   @dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
   def external_task_sensor():
       ExternalTaskSensor.partial(
           task_id='wait',
       ).expand(external_dag_id=["dag_1", "dag_2", "dag_3"])
   
   _ = external_task_sensor()
   
   
   ```
   
![image](https://user-images.githubusercontent.com/3998685/180580272-b9011b08-4156-4518-9929-8fbc7133340e.png)
   
   However there is no reason use property `operator_extra_links` in this cases
   
   ---
   
   It still not completely help in case when property uses for dynamic links 
selections such as: #25243
   
   ```python
   from pendulum import datetime
   
   from airflow.decorators import dag
   from airflow.providers.amazon.aws.operators.batch import BatchOperator
   
   
   @dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
   def batchop_dtm():
       BatchOperator.partial(
           task_id='submit_batch_job',
           job_queue="batch_job_queue_name",
           job_definition="batch_job_definition_name",
           overrides={},
           # Set this flag to False, so we can test the sensor below
           wait_for_completion=False,
       ).expand(job_name=["job_1", "job_2", "job_3"])
   
   
   _ = batchop_dtm()
   ```
   
   With this PoC error changed, `airflow.models.mappedoperator.MappedOperator` 
doesn't have access to attributes of Task
   
   ```
   Broken DAG: [/files/dags/batch_mapping.py] Traceback (most recent call last):
     File "/opt/airflow/airflow/utils/helpers.py", line 390, in 
resolve_property_value
       return prop.fget(obj)
     File "/opt/airflow/airflow/providers/amazon/aws/operators/batch.py", line 
117, in operator_extra_links
       if self.wait_for_completion:
   AttributeError: type object 'SerializedBaseOperator' has no attribute 
'wait_for_completion'
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 
1178, in to_dict
       json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": 
cls.serialize_dag(var)}
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 
1086, in serialize_dag
       raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
   airflow.exceptions.SerializationError: Failed to serialize DAG 
'batchop_dtm': type object 'SerializedBaseOperator' has no attribute 
'wait_for_completion'
   ```
   
   cc: @josh-fell 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to