jroachgolf84 opened a new issue, #50369:
URL: https://github.com/apache/airflow/issues/50369
### Description
Less of a feature request and more of a conversation around how we'll start
to build custom Asset Watchers.
Is the general pattern going to be to build a Trigger, and leverage the
out-of-the-box `airflow.sdk.AssetWatcher` when defining an `Asset`? Or instead,
would a user/contributor define a custom Asset Watcher that has a baked-in
Trigger.
How will we pass information from an `AssetWatcher` object to downstream
Tasks in a DAG? Will this always be via `context['triggering_asset_events']
...`? The most obvious example would be a file lands in an object store, and we
want to process it.
Here's an example that's available in Astronomer's documentation, using SQS.
```
# Define an asset that watches for messages on the queue
sqs_queue_asset = Asset(
"sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher",
trigger=trigger)]
)
# Schedule the DAG to run when the asset is triggered
@dag(schedule=[sqs_queue_asset])
def event_driven_dag():
@task
def process_message(**context):
# Extract the triggering asset events from the context
triggering_asset_events = context["triggering_asset_events"]
for event in triggering_asset_events[sqs_queue_asset]:
# Get the message from the TriggerEvent payload
print(
f"Processing message:
{event.extra["payload"]["message_batch"][0]["Body"]}"
)
process_message()
event_driven_dag()
```
### Use case/motivation
Event-driven (Asset-aware) scheduling is (IMO) going to be one of the most
powerful features that Airflow users will leverage. I'm hoping to lay the
groundwork for this as both users create custom Asset Watchers and contributors
add to providers.
### Related issues
_No response_
### Are you willing to submit a PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]