GitHub user pedro-cf closed a discussion: Best practices for implementing 
data-aware DAGs using Datasets or Dataset aliases

I'm looking to implement a data pipeline using two DAGs and Airflow's Datasets 
feature. I'd appreciate guidance on the best way to structure this using 
Datasets or Dataset aliases.

## Current setup:

1. `ingest_DAG`:
   - Fetches data from an external API
   - Processes and loads the data into a PostgreSQL table (`ingest_table`)
   - Includes a `run_id` column in `ingest_table` to uniquely identify each 
batch of ingested data

2. `transform_DAG`:
   - Triggered by the Dataset produced by `ingest_DAG`
   - Must receive the `run_id` to identify the specific batch of data to process
   - Reads data from `ingest_table` using the received `run_id`
   - Processes the data
   - Inserts results into `transform_table`

## Current solution:

```python
# ingest_dag.py
from airflow import DAG
from airflow.decorators import task
from airflow.datasets import Dataset, DatasetAlias
from datetime import datetime, timedelta

DATA_ALIAS = DatasetAlias("data_alias")

with DAG(
    'ingest_dag',
    start_date=datetime(2023, 1, 1),
    schedule="@hourly",
    catchup=False,
    is_paused_upon_creation=False,
) as dag:

    @task
    def fetch_data():
        return {"data": "sample data"}

    @task
    def load_to_database(data):
        print(f"Loading data to database: {data}")

    @task(outlets=[DATA_ALIAS])
    def produce_dataset(data, run_id=None, outlet_events=None):
        print(f"Producing dataset for run_id: {run_id}")
        outlet_events[DATA_ALIAS].add(Dataset("data_alias"), extra={"run_id": 
run_id})

    fetched_data = fetch_data()
    loaded_data = load_to_database(fetched_data)
    produce_dataset(loaded_data)
```

```python
# transform_dag.py
from airflow import DAG
from airflow.decorators import task
from airflow.datasets import Dataset, DatasetAlias
from datetime import datetime, timedelta

DATA_ALIAS = DatasetAlias("data_alias")

with DAG(
    'transform_dag',
    start_date=datetime(2023, 1, 1),
    schedule=DATA_ALIAS,
    catchup=False,
    is_paused_upon_creation=False,
) as dag:

    @task(inlets=[DATA_ALIAS])
    def process_data(inlet_events=None):
        events = inlet_events[DATA_ALIAS]
        triggering_run_id = events[-1].extra["run_id"]
        print(f"Processing data for run_id: {triggering_run_id}")
        return {"processed_data": "sample processed data"}

    @task
    def store_processed_data(data):
        print(f"Storing processed data: {data}")

    processed_data = process_data()
    store_processed_data(processed_data)
```

## Questions:

1. Is my current approach using Datasets and Dataset aliases effective for this 
pipeline, or are there improvements I could make?
2. In my current setup, am I correctly passing the `run_id` from `ingest_DAG` 
to `transform_DAG` using Datasets? 
3. I'm currently including the `run_id` in the Dataset's `extra` field. Is this 
the recommended way to pass metadata between DAGs, or are there better 
alternatives?
4. In the `transform_DAG`, how can I ensure that only the data corresponding to 
the received `run_id` is processed? 
5. Given my current implementation, are there any potential pitfalls or common 
mistakes I should be aware of when using Datasets for inter-DAG communication?
6. Are there any performance considerations I should be aware of with this 
approach, especially as the volume of data increases?
7. How can I ensure idempotency in my `transform_DAG`, particularly if it needs 
to be rerun for the same `run_id`?
8. Are there any Airflow features or patterns that could simplify or improve my 
current implementation?

Any insights, examples, or recommendations based on my current solution would 
be greatly appreciated. Thank you!


GitHub link: https://github.com/apache/airflow/discussions/43126

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to