mykola-shyshov commented on issue #58303:
URL: https://github.com/apache/airflow/issues/58303#issuecomment-3533934853
let me check. it is not hard to reproduce.
```
from datetime import datetime, timedelta
from airflow import DAG, Asset
from airflow.decorators import task
# Create an asset with a non-existing URI
non_existing_asset = Asset("s3://non-existing-bucket/path/to/data")
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 1, 1),
"retries": 0,
}
with DAG(
dag_id="reproduce_issue_58303",
default_args=default_args,
description="Reproduce flaky
AirflowInactiveAssetInInletOrOutletException",
schedule=timedelta(seconds=30), # Run every 30 seconds to trigger race
condition
catchup=False,
tags=["bug_reproduction", "issue_58303"],
) as dag:
@task(
inlets=[non_existing_asset], # This should trigger the flaky
exception
outlets=[],
)
def task_with_unused_inlet():
"""
Simple task with an inlet that references a non-existing asset.
According to issue #58303, this should intermittently raise
AirflowInactiveAssetInInletOrOutletException due to a race condition.
"""
print("Task executed successfully")
print(f"This task declares inlet: {non_existing_asset.uri}")
return "success"
task_with_unused_inlet()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]