renzo-sanchez-h commented on issue #29199:
URL: https://github.com/apache/airflow/issues/29199#issuecomment-1446716515
hello, just found out another case recently related to this. It happens when
upstream failed too.
```python
import logging
import pendulum
from airflow import DAG
from airflow.decorators import task
DAG_ID = "testing_dag_AIR"
# SANITIZED = True
SANITIZED = False
with DAG(
dag_id=DAG_ID,
schedule='0 7-18 * * *',
start_date=pendulum.datetime(2022, 12, 15, 7, 0, 0),
) as dag:
@task
def sanitize(is_sanitized: bool):
if is_sanitized:
return "success"
raise Exception("Input layout is incorrect")
@task
def truncate_table():
return "success"
@task(multiple_outputs=True)
def execute_stored_procedure():
return {
"total_recs": 100,
"invalid_recs": 0,
"has_invalid_records": True
}
@task(trigger_rule='none_skipped')
def notify_data_team(total_recs: int, invalid_recs: int):
logging.info(f"notify_data_team__total_recs: {total_recs}")
logging.info(f"notify_data_team__invalid_recs: {invalid_recs}")
@task(trigger_rule='none_skipped')
def send_email(total_recs: int, invalid_recs: int):
logging.info(f"send_email__total_recs: {total_recs}")
logging.info(f"send_email__invalid_recs: {invalid_recs}")
is_sanitized = SANITIZED
sanitize_result = sanitize(is_sanitized)
truncated = truncate_table()
executed_sp = execute_stored_procedure()
notified_data_team = notify_data_team(executed_sp["total_recs"],
executed_sp["invalid_recs"])
sent_email = send_email(executed_sp["total_recs"],
executed_sp["invalid_recs"])
sanitize_result >> truncated >> executed_sp >> [notified_data_team,
sent_email]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]