mykola-shyshov commented on issue #58303:
URL: https://github.com/apache/airflow/issues/58303#issuecomment-3533934853

   let me check. it is not hard to reproduce.
   
   ```
   from datetime import datetime, timedelta
   from airflow import DAG, Asset
   from airflow.decorators import task
   
   # Create an asset with a non-existing URI
   non_existing_asset = Asset("s3://non-existing-bucket/path/to/data")
   
   default_args = {
       "owner": "airflow",
       "depends_on_past": False,
       "start_date": datetime(2024, 1, 1),
       "retries": 0,
   }
   
   with DAG(
       dag_id="reproduce_issue_58303",
       default_args=default_args,
       description="Reproduce flaky 
AirflowInactiveAssetInInletOrOutletException",
       schedule=timedelta(seconds=30),  # Run every 30 seconds to trigger race 
condition
       catchup=False,
       tags=["bug_reproduction", "issue_58303"],
   ) as dag:
   
       @task(
           inlets=[non_existing_asset],  # This should trigger the flaky 
exception
           outlets=[],
       )
       def task_with_unused_inlet():
           """
           Simple task with an inlet that references a non-existing asset.
           According to issue #58303, this should intermittently raise
           AirflowInactiveAssetInInletOrOutletException due to a race condition.
           """
           print("Task executed successfully")
           print(f"This task declares inlet: {non_existing_asset.uri}")
           return "success"
   
       task_with_unused_inlet()
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to