rawwar commented on code in PR #57325: URL: https://github.com/apache/airflow/pull/57325#discussion_r2465008478
########## airflow-core/docs/installation/upgrading_to_airflow3.rst: ########## @@ -185,22 +188,163 @@ code import Airflow components correctly in Airflow 3. The older paths are depre - **Future Airflow version**: Legacy imports will be **removed** Step 4: Install the Standard Provider --------------------------------------- +------------------------------------- - Some of the commonly used Operators which were bundled as part of the ``airflow-core`` package (for example ``BashOperator`` and ``PythonOperator``) have now been split out into a separate package: ``apache-airflow-providers-standard``. - For convenience, this package can also be installed on Airflow 2.x versions, so that Dags can be modified to reference these Operators from the standard provider package instead of Airflow Core. +.. _migrating-database-access: + Step 5: Review custom operators for direct db access ---------------------------------------------------- - In Airflow 3 operators can not access the Airflow metadata database directly using database sessions. If you have custom operators, review the code to make sure there are no direct db access. You can follow examples in https://github.com/apache/airflow/issues/49187 to find how to modify your code if needed. +Migrating Database Access in Tasks +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In Airflow 2, tasks could directly access the Airflow metadata database using database sessions. This capability has been removed in Airflow 3 for security and architectural reasons. Here's how to migrate your code: + +**Airflow 2 Pattern (No longer supported in Airflow 3):** + +.. code-block:: python + + # These patterns will NOT work in Airflow 3 + from airflow import settings + from airflow.utils.session import provide_session, create_session + from airflow.models import TaskInstance, DagRun + + + # Direct database session access + @provide_session + def my_task_function(session=None): + # This will fail in Airflow 3 + task_instances = session.query(TaskInstance).filter(...).all() + return task_instances + + + # Context manager approach + def another_task_function(): + with create_session() as session: + # This will fail in Airflow 3 + dag_runs = session.query(DagRun).filter(...).all() + return dag_runs + + + # Direct settings.Session usage + def direct_session_task(): + session = settings.Session() + try: + # This will fail in Airflow 3 + result = session.query(TaskInstance).count() + session.commit() + finally: + session.close() + return result + +**Airflow 3 Migration Path:** + +For most common database operations, use the Task SDK's API client instead: + +.. code-block:: python + + from airflow.sdk import DAG, BaseOperator + from airflow.sdk.api.client import Client + from datetime import datetime + + + class MyCustomOperator(BaseOperator): + def execute(self, context): + # Get API client from context + client = context["task_instance"].task_sdk_client + + # Get task instance count + count_result = client.task_instances.get_count(dag_id="my_dag", states=["success", "failed"]) + + # Get DAG run count + dag_run_count = client.dag_runs.get_count(dag_id="my_dag", states=["success"]) + + return {"ti_count": count_result.count, "dr_count": dag_run_count.count} + +**Alternative: Create Explicit Database Session (Advanced Users Only)** + +If you absolutely need direct database access for complex queries not covered by the API, you can create an explicit database session. **Use this approach with extreme caution** as it bypasses Airflow 3's security model: + +.. code-block:: python + + from airflow.sdk import BaseOperator + from airflow.configuration import conf + from sqlalchemy import create_engine + from sqlalchemy.orm import sessionmaker + import logging + + + class DatabaseAccessOperator(BaseOperator): + """ + WARNING: This approach bypasses Airflow 3's security model. + Use only when the Task SDK API doesn't provide the needed functionality. + """ + + def execute(self, context): + # Create explicit database connection + sql_alchemy_conn = conf.get("database", "sql_alchemy_conn") + engine = create_engine(sql_alchemy_conn) + Session = sessionmaker(bind=engine) Review Comment: I don't think this will work. We explicitly update sql_alchemy_conn value to a wrong value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
