renzo-sanchez-h opened a new issue, #29199:
URL: https://github.com/apache/airflow/issues/29199

   ### Apache Airflow version
   
   2.5.1
   
   ### What happened
   
   Most of our code is based on TaskFlow API and we have many tasks that raise 
AirflowSkipException (or BranchPythonOperator) on purpose to skip the next 
downstream task (with trigger_rule = none_failed_min_one_success).
   And these tasks are expecting a multiple output XCom result 
(local_file_path, file sizes, records count) from previous tasks and it's 
causing this error:
   `airflow.exceptions.XComNotFound: XComArg result from 
copy_from_data_lake_to_local_file at outbound_dag_AIR2070 with 
key="local_file_path" is not found!`
   
   
   ### What you think should happen instead
   
   Considering trigger rule "none_failed_min_one_success", we expect that 
upstream task should be allowed to skip and downstream tasks will still run 
without raising any errors caused by not found XCom results.
   
   ### How to reproduce
   
   This is an aproximate example dag based on an existing one.
   
   ```
   import pendulum
   from airflow import DAG
   from airflow.decorators import task
   from airflow.operators.python import BranchPythonOperator
   from src.helpers import Helper
   
   PGP_OPERATION = "decrypt"
   LOCAL_FILE_PATH = "/temp/example/example.csv"
   
   with DAG(
           schedule_interval='0 7-18 * * *',
           start_date=pendulum.datetime(2022,12,15,7,0,0),
   ) as dag:
       @task(multiple_outputs=True, trigger_rule='none_failed_min_one_success')
       def copy_from_local_file_to_data_lake(local_file_path: str, 
dest_dir_path: str):
           destination_file_path, file_size = 
Helper.copy_from_local_file_to_data_lake(
               local_file_path,
               dest_dir_path
           )
   
           return {
               "destination_file_path": destination_file_path,
               "file_size": file_size
           }
   
       @task(multiple_outputs=True, trigger_rule='none_failed_min_one_success')
       def copy_from_data_lake_to_local_file(data_lake_file_path, 
local_dir_path):
           local_file_path, file_size = 
Helper.copy_from_data_lake_to_local_file(
               data_lake_file_path,
               local_dir_path
           )
   
           return {
               "local_file_path": local_file_path,
               "file_size": file_size
           }
       
       @task(multiple_outputs=True, task_id='get_pgp_file_info', 
trigger_rule='none_failed_min_one_success')
       def get_pgp_file_info(file_path, operation):
           import uuid, os
   
           src_file_name = os.path.basename(file_path)
           src_file_dir = os.path.dirname(file_path)
           run_id = str(uuid.uuid4())
           
           if operation == "decrypt":
               wait_pattern = f'*{src_file_name}'
           else:
               wait_pattern = f'*{src_file_name}.pgp'
   
           return {
           'src_file_path': file_path,
           'src_file_dir': src_file_dir,
           'pattern': wait_pattern,
           'guid': run_id
       }
   
       @task(multiple_outputs=True, task_id='return_src_path', 
trigger_rule='none_failed_min_one_success')
       def return_src_path(src_file_path):
           import os
   
           return {
               'file_path': src_file_path,
               'file_size': os.path.getsize(src_file_path)
           }
   
   
       @task(multiple_outputs=True, task_id='choose_result', 
trigger_rule='none_failed_min_one_success')
       def choose_result(src_results, decrypt_results):
           import os
   
           file_path = decrypt_results['local_file_path'] or 
src_results['file_path']
           file_size = decrypt_results['file_size'] or src_results['file_size']
   
           local_dir = os.path.dirname(file_path)
   
           return {
               'local_dir': local_dir,
               'file_path': file_path,
               'file_size': file_size,
               'file_name': os.path.basename(file_path)
           }
       
       def switch_branch_func(pgp_operation):
           if pgp_operation in ["decrypt", "encrypt"]:
               return 'return_src_path'
           else:
               return 'get_pgp_file_info'
   
       operation = PGP_OPERATION
       local_file_path = LOCAL_FILE_PATH
   
       check_need_to_decrypt = BranchPythonOperator(
           task_id='branch_task',
           python_callable=switch_branch_func,
           op_args=(operation,))
   
       pgp_file_info = get_pgp_file_info(local_file_path, operation)
   
       data_lake_file = 
copy_from_local_file_to_data_lake(pgp_file_info['src_file_path'], 
pgp_file_info['target_path'])
   
       decrypt_local_file = 
copy_from_data_lake_to_local_file(data_lake_file['destination_file_path'], 
pgp_file_info['src_file_dir'])
   
       src_result = return_src_path(local_file_path)
   
       result = choose_result(src_result, decrypt_local_file)
   
       check_need_to_decrypt >> [pgp_file_info, src_result]
       pgp_file_info >> decrypt_local_file
       [decrypt_local_file, src_result] >> result
   ```
   
   ### Operating System
   
   Windows 10
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   docker-compose version: 3.7
   
   Note: This also happens when it's deployed to one of our testing 
environments using official Airflow Helm Chart.
   
   ### Anything else
   
   This issue is similar to 
[#24338](https://github.com/apache/airflow/issues/24338), it was solved by 
[#25661](https://github.com/apache/airflow/pull/25661) but this case is related 
to multiple_outputs being set to True.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to