jroachgolf84 opened a new issue, #50369:
URL: https://github.com/apache/airflow/issues/50369

   ### Description
   
   Less of a feature request and more of a conversation around how we'll start 
to build custom Asset Watchers.
   
   Is the general pattern going to be to build a Trigger, and leverage the 
out-of-the-box `airflow.sdk.AssetWatcher` when defining an `Asset`? Or instead, 
would a user/contributor define a custom Asset Watcher that has a baked-in 
Trigger.
   
   How will we pass information from an `AssetWatcher` object to downstream 
Tasks in a DAG? Will this always be via `context['triggering_asset_events'] 
...`? The most obvious example would be a file lands in an object store, and we 
want to process it. 
   
   Here's an example that's available in Astronomer's documentation, using SQS.
   
   ```
   # Define an asset that watches for messages on the queue
   sqs_queue_asset = Asset(
       "sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher", 
trigger=trigger)]
   )
   
   
   # Schedule the DAG to run when the asset is triggered
   @dag(schedule=[sqs_queue_asset])
   def event_driven_dag():
       @task
       def process_message(**context):
           # Extract the triggering asset events from the context
           triggering_asset_events = context["triggering_asset_events"]
           for event in triggering_asset_events[sqs_queue_asset]:
               # Get the message from the TriggerEvent payload
               print(
                   f"Processing message: 
{event.extra["payload"]["message_batch"][0]["Body"]}"
               )
   
       process_message()
   
   
   event_driven_dag()
   ```
   
   ### Use case/motivation
   
   Event-driven (Asset-aware) scheduling is (IMO) going to be one of the most 
powerful features that Airflow users will leverage. I'm hoping to lay the 
groundwork for this as both users create custom Asset Watchers and contributors 
add to providers.
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to