Hi all,

Per Jarek's suggestion in GitHub discussion #65453 [1], I'm bringing this
proposal to the devlist before opening PRs. The full long-form post is on
the discussion; this is a tighter summary for devlist review.


I'm working on improving Airflow 3.x's behavior on PostgreSQL-compatible
databases (currently focused on CockroachDB, which is wire-protocol
compatible with PG). Three small, generically-justifiable improvements have
come out of that work. I want maintainer signal on the proposal before
drafting PRs.


== Background (skip if uninterested) ==

   - Integration POC validates Airflow 3.2.0 against CockroachDB v25.4:
   airflow db migrate succeeds, DAG parsing/scheduling/task execution all
   work, example DAGs run end-to-end. 111 Alembic migrations audited, 12
   findings, all handled.
   - A patch for sqlalchemy-cockroachdb [3] registers
   @compiles(timestampdiff, "cockroachdb") so MySQL-style
   func.timestampdiff(unit, start, end) (the fallback used in
   airflow-core/src/airflow/models/taskinstance.py and dagrun.py for
   non-PG/non-SQLite dialects) compiles to a PostgreSQL-style EXTRACT(EPOCH
   FROM ...) expression. This means Airflow itself needs no change for the
   timestampdiff issue once the dialect ships 2.0.4+.

So the items below are the only ones that genuinely require small upstream
changes.


*== Proposed change 1 — Generic 40001 serialization-failure retry in the
scheduler ==*

Files: airflow-core/src/airflow/utils/retries.py and
airflow-core/src/airflow/jobs/scheduler_job_runner.py

Why this is generic, not CRDB-specific: PostgreSQL also raises
SerializationFailure (SQLSTATE 40001) under high concurrency on
SELECT ... FOR NO KEY UPDATE ... SKIP LOCKED. Today Airflow's scheduler
does not retry these reliably. Issue #40882 [4] (closed) noted that
retry_db_transaction only issues session.rollback() for
OperationalError, not all DBAPIError subclasses, leaving subsequent retries
to fail with InFailedSqlTransaction.

Proposal:

   1. Fix retry_db_transaction to roll back on any DBAPIError, not just
   OperationalError.
   2. Apply it (or an equivalent retry loop) to
   _critical_section_enqueue_task_instances.
   3. Use exponential backoff and bound the number of retries.

Risk: Medium — touches the scheduler hot path. Should be guarded by tests
that fault-inject 40001 and assert progress. Benefit: Improves PG
resilience under concurrent scheduler workloads. Side benefit: makes the
scheduler usable on any PG-compatible DB whose isolation model surfaces
40001.

This is the largest of the three changes and the one I most want a steer on.


*== Proposed change 2 — Add cockroachdb to the async-driver mapping ==*

File: airflow-core/src/airflow/settings.py:240

def _get_async_conn_uri_from_sync(sync_uri): AIO_LIBS_MAPPING = {"sqlite":
"aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"} ...

Proposal: Add "cockroachdb": "asyncpg" to the mapping.

Why standalone-justifiable: This function exists to derive an async URI
when one isn't explicitly configured. Today, anyone using a
cockroachdb:// SQLAlchemy URL (the canonical scheme registered by
sqlalchemy-cockroachdb since 2017) hits "The asyncio extension requires an
async driver. The loaded 'psycopg2' is not async." The workaround is
setting AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_ASYNC explicitly. One-line
fix; no behavior change for postgres/mysql/sqlite users.

Risk: Trivial. Touches one dict literal.


*== Proposed change 3 — Dialect-aware UUID generation in migration 0042 ==*

File: 
airflow-core/src/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py

The migration creates a custom uuid_generate_v7() SQL function via CREATE
EXTENSION pgcrypto. CockroachDB doesn't expose pgcrypto extensions but
offers native gen_random_uuid().

Proposal: Add a dialect-name check at the top of the upgrade so
non-postgres PG-compatible dialects can supply their own UUID
generator (defaulting to gen_random_uuid() for cockroachdb). PG users see
no change. The fallback uses v4 UUIDs (not v7) on the alternate
dialect, which is documented as a tradeoff.

Risk: Low. Migration runs once, dialect-name-guarded, no impact on existing
PG installs.


== What I'm asking ==

   1. Are these acceptable as standalone PRs against main?
   2. Any framing or scoping changes you'd like before I draft them?
   3. For change 1 specifically: any preference on where the retry logic
   lives (decorator vs. explicit loop in the scheduler call site)?

== Disclosure ==

I work at Cockroach Labs. The integration POC and
sqlalchemy-cockroachdb#301 are linked below for full transparency. Happy to
share any additional context.


Thanks for reading.

Virag Tripathi


[1] https://github.com/apache/airflow/discussions/65453

[2] https://github.com/apache/airflow/issues/46175

[3] https://github.com/cockroachdb/sqlalchemy-cockroachdb/pull/301

[4] https://github.com/apache/airflow/issues/40882

Reply via email to