GitHub user assadali007 added a comment to the discussion: manual trigger dag

`from datetime import datetime
from pathlib import Path

import pandas as pd
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="01_unscheduled",
    start_date=datetime(2026, 1, 5),
    schedule_interval='@daily'
)

fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command= """
             mkdir -p /Users/asadali/data/events && \
             curl -o /Users/asadali/data/events/events.json \
             'http://localhost:5000/events'
    
    """,
    dag=dag,
)


def _calculate_stats(input_path, output_path):
    """Calculates event statistics."""
    print(input_path)
    print(output_path)

    Path(output_path).parent.mkdir(exist_ok=True)

    events = pd.read_json(input_path)
    stats = events.groupby(["date", "user"]).size().reset_index()

    stats.to_csv(output_path, index=False)


calculate_stats = PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    op_kwargs={"input_path": "/Users/asadali/data/events/events.json",
               "output_path": "/Users/asadali/data/events/stats.csv"},
    dag=dag,
)

fetch_events >> calculate_stats` this is code i got this example on book

GitHub link: 
https://github.com/apache/airflow/discussions/60255#discussioncomment-15466475

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to