ajayganti3 opened a new issue, #51213:
URL: https://github.com/apache/airflow/issues/51213

   ### Apache Airflow Provider(s)
   
   amazon
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon 9.8.0
   
   ### Apache Airflow version
   
   3.0.1
   
   ### Operating System
   
   Ubuntu 22.04.3 LTS
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   I'm using the official airflow docker compose file to spin up the airflow 
3.0.1
   
   ### What happened
   
   I have a DAG configured with an Amazon SQS trigger using Airflow 3.0.1 and 
the airflow/assets approach to auto-trigger DAGs upon message arrival in the 
queue. The DAG works as expected for a short period (approximately 30 minutes), 
triggering whenever a message is sent to the SQS queue.
   
   However, after this initial period:
   
   - The DAG stops triggering despite new messages being available in the SQS 
queue.
   - Restarting Airflow using docker compose restart temporarily restores the 
triggering functionality.
   - After another ~30 minutes, the problem recurs.
   - In some cases, even after restarting, the DAG never triggers again.
   - The aws_default connection used for the SQS integration remains valid and 
operational (confirmed via manual testing).
   
   ### What you think should happen instead
   
   The SQS-triggered DAG should:
   
   - Continuously monitor the queue and trigger the DAG when a new message 
arrives.
   - Remain functional indefinitely without needing to restart Airflow.
   - Provide meaningful logs or error messages if/when it stops functioning.
   
   ### How to reproduce
   
   1. Use Airflow 3.0.1 official airflow docker compose
   2. Deploy with Docker Compose and start the Airflow environment.
   3. Define a DAG with an SQS trigger using Airflow assets.
   4. Set up a valid aws_default connection with appropriate IAM permissions.
   5. Send messages to the SQS queue — observe that the DAG is triggered.
   6. Wait — after a while, send another message. (Or use a separate python 
script that sends messages to the SQS at regular intervals)
   7. Observe that the DAG is no longer triggered.
   8. Restart Airflow with docker compose restart, and observe that triggering 
works again briefly, then fails. (Sometimes even after the restart also it 
never gets triggered).
   9. Below is the airflow DAG that I'm using.
   
   ```from airflow.providers.common.messaging.triggers.msg_queue import 
MessageQueueTrigger
   from airflow.sdk import Asset, AssetWatcher, dag, task
   import os
   
   # Define the SQS queue URL
   # Replace my_account_id and my_queue_name
   SQS_QUEUE = 
"https://sqs.us-east-1.amazonaws.com/<my_account_id>/<my_queue_name>"
   
   # Define a trigger that listens to an external message queue (AWS SQS in 
this case)
   trigger = MessageQueueTrigger(
       aws_conn_id="aws_default",
       queue=SQS_QUEUE,
       waiter_delay=10,  # delay in seconds between polls
   )
   
   # Define an asset that watches for messages on the queue
   sqs_queue_asset = Asset(
       "sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher", 
trigger=trigger)]
   )
   
   
   # Schedule the DAG to run when the asset is triggered
   @dag(schedule=[sqs_queue_asset])
   def event_driven_dag():
       @task
       def process_message(**context):
           # Extract the triggering asset events from the context
           triggering_asset_events = context["triggering_asset_events"]
           for event in triggering_asset_events[sqs_queue_asset]:
               # Get the message from the TriggerEvent payload
               print(
                   f"Processing message: 
{event.extra["payload"]["message_batch"][0]["Body"]}"
               )
   
       process_message()
   
   
   event_driven_dag()
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to