GitHub user Redevil10 edited a discussion: [Plugin] airflow-postgres-csv — 
Airflow operators for bulk PostgreSQL ↔ CSV transfers

Hi everyone 👋

I built a small pip-installable package that provides two operators for bulk 
PostgreSQL ↔ CSV data transfers using PostgreSQL's native `COPY` command:

- **`PostgresToCsvOperator`** — run a SQL query and stream results to a CSV file
- **`CsvToPostgresOperator`** — load a CSV file into a PostgreSQL table

**🔗 Repo:** https://github.com/Redevil10/airflow-postgres-csv
**📦 PyPI:** https://pypi.org/project/airflow-postgres-csv/

## Why I built this

In my day-to-day work I frequently need to export query results to CSV for 
downstream consumers, or bulk-load CSVs into staging tables. The existing 
`PostgresOperator` doesn't natively support CSV I/O, and writing `COPY` 
boilerplate in every DAG gets repetitive. I wanted something that:

- Uses `COPY` for maximum throughput (not row-by-row inserts)
- Supports gzip compression for large files
- Handles parameterised queries via `cursor.mogrify`
- Loads SQL from inline strings or `.sql` files
- Works cleanly with Airflow 3 and `apache-airflow-providers-postgres >= 6.0.0`

## Quick example

```python
from airflow_postgres_csv import PostgresToCsvOperator, CsvToPostgresOperator

# Export query results to CSV
export_task = PostgresToCsvOperator(
    task_id="export_users",
    conn_id="my_postgres",
    sql="SELECT * FROM users WHERE active = %(active)s",
    parameters={"active": True},
    csv_file_path="/tmp/users.csv",
)

# Load CSV into a table (with optional truncate)
import_task = CsvToPostgresOperator(
    task_id="import_users",
    conn_id="my_postgres",
    table_name="staging.users",
    csv_file_path="/tmp/users.csv",
    truncate=True,
)
```

## Features

- **Gzip support** — set `compression="gzip"` on either operator for `.csv.gz` 
files
- **SQL from files** — pass a relative or absolute path to `sql` and it loads 
the file automatically
- **Configurable CSV options** — delimiter, quote char, null string, header 
toggle
- **Query timeout** — configurable timeout in minutes (default 60)
- **Templated fields** — `csv_file_path`, `table_name`, and `sql` all support 
Jinja templating

## Quality

- CI with linting and tests via GitHub Actions
- Code coverage tracked on Codecov
- Python 3.10 / 3.11 / 3.12 supported
- MIT licensed

## Install

```bash
pip install airflow-postgres-csv
```

This is my first package published to PyPI, so I'm sure there are things I 
could do better — whether it's packaging, API design, testing approach, or 
anything else. I'd really appreciate any feedback or suggestions. Happy to 
accept contributions too!

GitHub link: https://github.com/apache/airflow/discussions/62450

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to