rawwar commented on code in PR #57325:
URL: https://github.com/apache/airflow/pull/57325#discussion_r2465024647


##########
airflow-core/docs/installation/upgrading_to_airflow3.rst:
##########
@@ -185,22 +188,163 @@ code import Airflow components correctly in Airflow 3. 
The older paths are depre
 - **Future Airflow version**: Legacy imports will be **removed**
 
 Step 4: Install the Standard Provider
---------------------------------------
+-------------------------------------
 
 - Some of the commonly used Operators which were bundled as part of the 
``airflow-core`` package (for example ``BashOperator`` and ``PythonOperator``)
   have now been split out into a separate package: 
``apache-airflow-providers-standard``.
 - For convenience, this package can also be installed on Airflow 2.x versions, 
so that Dags can be modified to reference these Operators from the standard 
provider
   package instead of Airflow Core.
 
+.. _migrating-database-access:
+
 Step 5: Review custom operators for direct db access
 ----------------------------------------------------
 
 - In Airflow 3 operators can not access the Airflow metadata database directly 
using database sessions.
   If you have custom operators, review the code to make sure there are no 
direct db access.
   You can follow examples in https://github.com/apache/airflow/issues/49187 to 
find how to modify your code if needed.
 
+Migrating Database Access in Tasks
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+In Airflow 2, tasks could directly access the Airflow metadata database using 
database sessions. This capability has been removed in Airflow 3 for security 
and architectural reasons. Here's how to migrate your code:
+
+**Airflow 2 Pattern (No longer supported in Airflow 3):**
+
+.. code-block:: python
+
+    # These patterns will NOT work in Airflow 3
+    from airflow import settings
+    from airflow.utils.session import provide_session, create_session
+    from airflow.models import TaskInstance, DagRun
+
+
+    # Direct database session access
+    @provide_session
+    def my_task_function(session=None):
+        # This will fail in Airflow 3
+        task_instances = session.query(TaskInstance).filter(...).all()
+        return task_instances
+
+
+    # Context manager approach
+    def another_task_function():
+        with create_session() as session:
+            # This will fail in Airflow 3
+            dag_runs = session.query(DagRun).filter(...).all()
+            return dag_runs
+
+
+    # Direct settings.Session usage
+    def direct_session_task():
+        session = settings.Session()
+        try:
+            # This will fail in Airflow 3
+            result = session.query(TaskInstance).count()
+            session.commit()
+        finally:
+            session.close()
+        return result
+
+**Airflow 3 Migration Path:**
+
+For most common database operations, use the Task SDK's API client instead:
+
+.. code-block:: python
+
+    from airflow.sdk import DAG, BaseOperator
+    from airflow.sdk.api.client import Client
+    from datetime import datetime
+
+
+    class MyCustomOperator(BaseOperator):
+        def execute(self, context):
+            # Get API client from context
+            client = context["task_instance"].task_sdk_client
+
+            # Get task instance count
+            count_result = client.task_instances.get_count(dag_id="my_dag", 
states=["success", "failed"])
+
+            # Get DAG run count
+            dag_run_count = client.dag_runs.get_count(dag_id="my_dag", 
states=["success"])
+
+            return {"ti_count": count_result.count, "dr_count": 
dag_run_count.count}
+
+**Alternative: Create Explicit Database Session (Advanced Users Only)**
+
+If you absolutely need direct database access for complex queries not covered 
by the API, you can create an explicit database session. **Use this approach 
with extreme caution** as it bypasses Airflow 3's security model:
+
+.. code-block:: python
+
+    from airflow.sdk import BaseOperator
+    from airflow.configuration import conf
+    from sqlalchemy import create_engine
+    from sqlalchemy.orm import sessionmaker
+    import logging
+
+
+    class DatabaseAccessOperator(BaseOperator):
+        """
+        WARNING: This approach bypasses Airflow 3's security model.
+        Use only when the Task SDK API doesn't provide the needed 
functionality.
+        """
+
+        def execute(self, context):
+            # Create explicit database connection
+            sql_alchemy_conn = conf.get("database", "sql_alchemy_conn")
+            engine = create_engine(sql_alchemy_conn)
+            Session = sessionmaker(bind=engine)

Review Comment:
   I think, @vikramkoka idea was to have metadata DB connection string 
duplicated as a separate new env variable and then, create a new session with 
that instead.
   
   Something like this,
   
   ```
   sql_alchemy_conn = os.environ['my_new_connection_string']
   engine = create_engine(sql_alchemy_conn)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to