GitHub user Redevil10 edited a discussion: [Plugin] airflow-postgres-csv — Airflow operators for bulk PostgreSQL ↔ CSV transfers
Hi everyone 👋 I built a small pip-installable package that provides two operators for bulk PostgreSQL ↔ CSV data transfers using PostgreSQL's native `COPY` command: - **`PostgresToCsvOperator`** — run a SQL query and stream results to a CSV file - **`CsvToPostgresOperator`** — load a CSV file into a PostgreSQL table **🔗 Repo:** https://github.com/Redevil10/airflow-postgres-csv **📦 PyPI:** https://pypi.org/project/airflow-postgres-csv/ ## Why I built this In my day-to-day work I frequently need to export query results to CSV for downstream consumers, or bulk-load CSVs into staging tables. The existing `PostgresOperator` doesn't natively support CSV I/O, and writing `COPY` boilerplate in every DAG gets repetitive. I wanted something that: - Uses `COPY` for maximum throughput (not row-by-row inserts) - Supports gzip compression for large files - Handles parameterised queries via `cursor.mogrify` - Loads SQL from inline strings or `.sql` files - Works cleanly with Airflow 3 and `apache-airflow-providers-postgres >= 6.0.0` ## Quick example ```python from airflow_postgres_csv import PostgresToCsvOperator, CsvToPostgresOperator # Export query results to CSV export_task = PostgresToCsvOperator( task_id="export_users", conn_id="my_postgres", sql="SELECT * FROM users WHERE active = %(active)s", parameters={"active": True}, csv_file_path="/tmp/users.csv", ) # Load CSV into a table (with optional truncate) import_task = CsvToPostgresOperator( task_id="import_users", conn_id="my_postgres", table_name="staging.users", csv_file_path="/tmp/users.csv", truncate=True, ) ``` ## Features - **Gzip support** — set `compression="gzip"` on either operator for `.csv.gz` files - **SQL from files** — pass a relative or absolute path to `sql` and it loads the file automatically - **Configurable CSV options** — delimiter, quote char, null string, header toggle - **Query timeout** — configurable timeout in minutes (default 60) - **Templated fields** — `csv_file_path`, `table_name`, and `sql` all support Jinja templating ## Quality - CI with linting and tests via GitHub Actions - Code coverage tracked on Codecov - Python 3.10 / 3.11 / 3.12 supported - MIT licensed ## Install ```bash pip install airflow-postgres-csv ``` This is my first package published to PyPI, so I'm sure there are things I could do better — whether it's packaging, API design, testing approach, or anything else. I'd really appreciate any feedback or suggestions. Happy to accept contributions too! GitHub link: https://github.com/apache/airflow/discussions/62450 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
