GitHub user ketozhang edited a discussion: Improving the experience of manually 
triggering data-aware scheduling matching Airflow Params

Triggering a DAG with Params and Dataset Events are very similar. Before 
data-aware scheduling was introduce, you could mix Params, `schedule=None`, and 
the REST API to do this.

Today, although you can theoretically use both, they are very awkward to use 
together as you need to define a dedicated task that dispatches/picks the right 
source of input and parse out the Params dictionary or the Dataset Event's 
extra payload.

```mermaid
flowchart LR
  params[Params]
  dataset[Dataset Event]

  subgraph DAG
    direction LR
    A{parse} -- XCOM --> B[use_parsed_data]
  end

  params --> A
  dataset --> A
```

```python
DATASET_URI = ...

with DAG(params=..., ...) as dag:
    @task
    def parse(ctx):
        if ctx['triggering_dataset_events'][DATASET_URI]:
            return ctx['triggering_dataset_events'][DATASET_URI][0].extra
        elif ctx['params']:
            return ctx['params']['extra']
        else:
            raise RuntimeError('...')

    def use_parsed_data(parsed_data):
        ...

    use_parsed_data(parse())
```

Now, as it's design now there aren't many use cases where you should use both 
(outside of migrating between the two). However I feel like replacing Params in 
favor of data-aware schedule, you're losing out a lot of features:

1. Can trigger a DAG from the Play/Run button 
    - Can currently use REST API at `POST /datasets/events`
2. Can validate payload at downstream (i.e., Airflow Params have JSON 
validation)

My only suggestion is to to mirror the features of of Airflow Params which 
you're able to define some schema for the payload. Then a UI change to allow 
defining the dataset event after clicking the "Play button".

```python
dataset = Dataset('uri://foo/bar', extra_schema={...})
with DAG(schedule=[dataset]) as dag:
    @task
    def process_payload(payload):
        ...

    # NITPICK: Really wish we had a shorter Jinja for this
    process_payload("{{ triggering_dataset_events['%s'][0]['extra'] 
}}".format(dataset.uri))
``` 

With this I think it makes using both data-aware and Airflow Params  together 
obsolete as you can do both data-aware and manual triggering.

GitHub link: https://github.com/apache/airflow/discussions/43906

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to