xOnelinx opened a new issue #10794:
URL: https://github.com/apache/airflow/issues/10794


   
   **Apache Airflow version**:  1.10.10
   
   
   **Kubernetes version (if you are using kubernetes)**: 1.14.10
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: Any, GCP 
   - **OS** (e.g. from /etc/os-release): Ubuntu 1.10.10
   - **Kernel** (e.g. `uname -a`): Any
   - **Install tools**: Any
   - **Others**: NA
   
   **What happened**:
   DAGs with ExternalTaskMarker don't clean external task after second usage of 
clean on whole DAG
   
   
   **What you expected to happen**:
   
   All external task should be cleaned
   
   **How to reproduce it**:
   enable serialization `store_serialized_dags = True`
   
   create example DAGs:
   
   <details><summary>
   default_args = {'owner': 'airflow',
                   'start_date': datetime(2018, 1, 1)}
   
   def hello_world_py(*args):
       print('Hello World')
       print('This is DAG is dep}')
   
   
   schedule = '@daily'
   dag_id = 'dep_dag'
   with DAG(dag_id=dag_id,
            schedule_interval=schedule,
            default_args=default_args) as dag:
       t1 = PythonOperator(task_id='hello_world',
                           python_callable=hello_world_py,)
       dep_1 = ExternalTaskSensor(task_id='child_task1',
                                  external_dag_id='hello_world_2',
                                  external_task_id='parent_task',
                                  mode='reschedule')
       dep_1 >> t1
   ```
   ```
   def create_dag(dag_id, schedule, dag_number, default_args):
       dag = DAG(dag_id, schedule_interval=schedule,
                 default_args=default_args)
       with dag:
           t1 = PythonOperator(task_id='hello_world',
                               python_callable=hello_world_py,
                               dag_number=dag_number)
           parent_task = SerializableExternalTaskMarker(task_id='parent_task',
                                            external_dag_id='dep_dag',
                                            external_task_id='child_task1')
           t1 >> parent_task
       return dag
   
   for n in range(1, 4):
       dag_id = 'hello_world_{}'.format(str(n))
       default_args = {'owner': 'airflow',
                       'start_date': datetime(2018, 1, 1)}
       schedule = '@daily'
       dag_number = n
       globals()[dag_id] = create_dag(dag_id, schedule, dag_number, 
default_args)
   </summary></details>
   
   1. Run both DAGs
   2. Wait until first few dagruns where completed
   3. Clean first dugrun in DAG with marker
   4. Check external dug was cleaned on this date
   5. Mark success this date in each DAGs or wait until complete
   6. Clean DAG with marker second time on same date
   7. ExternalTaskMarker don't work
   
   **Anything else we need to know**:
   I think ExternalTaskMarker don't work because of serialization, after 
serialization each task instance get operator field equal 
SerializedBaseOperator and markers logic dot' work 
[here](https://github.com/apache/airflow/blob/master/airflow/models/dag.py#L1072)
 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to