renzo-sanchez-h opened a new issue, #29199:
URL: https://github.com/apache/airflow/issues/29199
### Apache Airflow version
2.5.1
### What happened
Most of our code is based on TaskFlow API and we have many tasks that raise
AirflowSkipException (or BranchPythonOperator) on purpose to skip the next
downstream task (with trigger_rule = none_failed_min_one_success).
And these tasks are expecting a multiple output XCom result
(local_file_path, file sizes, records count) from previous tasks and it's
causing this error:
`airflow.exceptions.XComNotFound: XComArg result from
copy_from_data_lake_to_local_file at outbound_dag_AIR2070 with
key="local_file_path" is not found!`
### What you think should happen instead
Considering trigger rule "none_failed_min_one_success", we expect that
upstream task should be allowed to skip and downstream tasks will still run
without raising any errors caused by not found XCom results.
### How to reproduce
This is an aproximate example dag based on an existing one.
```
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import BranchPythonOperator
from src.helpers import Helper
PGP_OPERATION = "decrypt"
LOCAL_FILE_PATH = "/temp/example/example.csv"
with DAG(
schedule_interval='0 7-18 * * *',
start_date=pendulum.datetime(2022,12,15,7,0,0),
) as dag:
@task(multiple_outputs=True, trigger_rule='none_failed_min_one_success')
def copy_from_local_file_to_data_lake(local_file_path: str,
dest_dir_path: str):
destination_file_path, file_size =
Helper.copy_from_local_file_to_data_lake(
local_file_path,
dest_dir_path
)
return {
"destination_file_path": destination_file_path,
"file_size": file_size
}
@task(multiple_outputs=True, trigger_rule='none_failed_min_one_success')
def copy_from_data_lake_to_local_file(data_lake_file_path,
local_dir_path):
local_file_path, file_size =
Helper.copy_from_data_lake_to_local_file(
data_lake_file_path,
local_dir_path
)
return {
"local_file_path": local_file_path,
"file_size": file_size
}
@task(multiple_outputs=True, task_id='get_pgp_file_info',
trigger_rule='none_failed_min_one_success')
def get_pgp_file_info(file_path, operation):
import uuid, os
src_file_name = os.path.basename(file_path)
src_file_dir = os.path.dirname(file_path)
run_id = str(uuid.uuid4())
if operation == "decrypt":
wait_pattern = f'*{src_file_name}'
else:
wait_pattern = f'*{src_file_name}.pgp'
return {
'src_file_path': file_path,
'src_file_dir': src_file_dir,
'pattern': wait_pattern,
'guid': run_id
}
@task(multiple_outputs=True, task_id='return_src_path',
trigger_rule='none_failed_min_one_success')
def return_src_path(src_file_path):
import os
return {
'file_path': src_file_path,
'file_size': os.path.getsize(src_file_path)
}
@task(multiple_outputs=True, task_id='choose_result',
trigger_rule='none_failed_min_one_success')
def choose_result(src_results, decrypt_results):
import os
file_path = decrypt_results['local_file_path'] or
src_results['file_path']
file_size = decrypt_results['file_size'] or src_results['file_size']
local_dir = os.path.dirname(file_path)
return {
'local_dir': local_dir,
'file_path': file_path,
'file_size': file_size,
'file_name': os.path.basename(file_path)
}
def switch_branch_func(pgp_operation):
if pgp_operation in ["decrypt", "encrypt"]:
return 'return_src_path'
else:
return 'get_pgp_file_info'
operation = PGP_OPERATION
local_file_path = LOCAL_FILE_PATH
check_need_to_decrypt = BranchPythonOperator(
task_id='branch_task',
python_callable=switch_branch_func,
op_args=(operation,))
pgp_file_info = get_pgp_file_info(local_file_path, operation)
data_lake_file =
copy_from_local_file_to_data_lake(pgp_file_info['src_file_path'],
pgp_file_info['target_path'])
decrypt_local_file =
copy_from_data_lake_to_local_file(data_lake_file['destination_file_path'],
pgp_file_info['src_file_dir'])
src_result = return_src_path(local_file_path)
result = choose_result(src_result, decrypt_local_file)
check_need_to_decrypt >> [pgp_file_info, src_result]
pgp_file_info >> decrypt_local_file
[decrypt_local_file, src_result] >> result
```
### Operating System
Windows 10
### Versions of Apache Airflow Providers
_No response_
### Deployment
Docker-Compose
### Deployment details
docker-compose version: 3.7
Note: This also happens when it's deployed to one of our testing
environments using official Airflow Helm Chart.
### Anything else
This issue is similar to
[#24338](https://github.com/apache/airflow/issues/24338), it was solved by
[#25661](https://github.com/apache/airflow/pull/25661) but this case is related
to multiple_outputs being set to True.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]