This is an automated email from the ASF dual-hosted git repository.

utkarsharma pushed a commit to branch v2-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-stable by this push:
     new 91f8265b68 Sync v2-10-stable with v2-10-test to release python client 
v2.10.0 (#41610)
91f8265b68 is described below

commit 91f8265b6820eca747634dce39ae0d70aada2640
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Thu Aug 22 14:30:09 2024 +0530

    Sync v2-10-stable with v2-10-test to release python client v2.10.0 (#41610)
    
    * Enable pull requests to be run from v*test branches (#41474) (#41476)
    
    Since we switch from direct push of cherry-picking to open PRs
    against v*test branch, we should enable PRs to run for the target
    branch.
    
    (cherry picked from commit a9363e6a30d73a647ed7d45c92d46d1f6f98513f)
    
    * Prevent provider lowest-dependency tests to run in non-main branch 
(#41478) (#41481)
    
    When running tests in v2-10-test branch, lowest depenency tests
    are run for providers - because when calculating separate tests,
    the "skip_provider_tests" has not been used to filter them out.
    
    This PR fixes it.
    
    (cherry picked from commit 75da5074969ec874040ea094d5afe00b7f02be76)
    
    * Make PROD image building works in non-main PRs (#41480) (#41484)
    
    The PROD image building fails currently in non-main because it
    attempts to build source provider packages rather than use them from
    PyPi when PR is run against "v-test" branch.
    
    This PR fixes it:
    
    * PROD images in non-main-targetted build will pull providers from
      PyPI rather than build them
    * they use PyPI constraints to install the providers
    * they use UV - which should speed up building of the images
    
    (cherry picked from commit 4d5f1c42a7873329b1b6b8b9b39db2c3033b46df)
    
    * Add WebEncoder for trigger page rendering to avoid render failure 
(#41350) (#41485)
    
    Co-authored-by: M. Olcay Tercanlı <[email protected]>
    
    * Incorrect try number subtraction producing invalid span id for OTEL 
airflow (issue #41501) (#41502) (#41535)
    
    * Fix for issue #39336
    
    * removed unnecessary import
    
    (cherry picked from commit dd3c3a7a43102c967d76cdcfe1f2f8ebeef4e212)
    
    Co-authored-by: Howard Yoo <[email protected]>
    
    * Fix failing pydantic v1 tests (#41534) (#41541)
    
    We need to exclude some versions of Pydantic v1 because it conflicts
    with aws provider.
    
    (cherry picked from commit a033c5f15a033c751419506ea77ffdbacdd37705)
    
    * Fix Non-DB test calculation for main builds (#41499) (#41543)
    
    Pytest has a weird behaviour that it will not collect tests
    from parent folder when subfolder of it is specified after the
    parent folder. This caused some non-db tests from providers folder
    have been skipped during main build.
    
    The issue in Pytest 8.2 (used to work before) is tracked at
    https://github.com/pytest-dev/pytest/issues/12605
    
    (cherry picked from commit d48982692c54d024d7c05e1efb7cd2adeb7d896c)
    
    * Add changelog for airflow python client 2.10.0 (#41583) (#41584)
    
    * Add changelog for airflow python client 2.10.0
    
    * Update client version
    
    (cherry picked from commit 317a28ed435960e7184e357a2f128806c34612fa)
    
    * Make all test pass in Database Isolation mode (#41567)
    
    This adds dedicated "DatabaseIsolation" test to airflow v2-10-test
    branch..
    
    The DatabaseIsolation test will run all "db-tests" with enabled
    DB isolation mode and running `internal-api` component - groups
    of tests marked with "skip-if-database-isolation" will be skipped.
    
    * Upgrade build and chart dependencies (#41570) (#41588)
    
    (cherry picked from commit c88192c466cb91842310f82a61eaa48b39439bef)
    
    Co-authored-by: Jarek Potiuk <[email protected]>
    
    * Limit watchtower as depenendcy as 3.3.0 breaks moin. (#41612)
    
    (cherry picked from commit 1b602d50266184d118db52a674baeab29b1f5688)
    
    * Enable running Pull Requests against v2-10-stable branch (#41624)
    
    (cherry picked from commit e306e7f7bc1ef12aeab0fc09e018accda3684a2f)
    
    * Fix tests/models/test_variable.py for database isolation mode (#41414)
    
    * Fix tests/models/test_variable.py for database isolation mode
    
    * Review feedback
    
    (cherry picked from commit 736ebfe3fe2bd67406d5a50dacbfa1e43767d4ce)
    
    * Make latest botocore tests green (#41626)
    
    The latest botocore tests are conflicting with a few requirements
    and until apache-beam upcoming version is released we need to do
    some manual exclusions. Those exclusions should make latest botocore
    test green again.
    
    (cherry picked from commit a13ccbbdec8e59f30218f604fca8cbb999fcb757)
    
    * Simpler task retrieval for taskinstance test (#41389)
    
    The test has been updated for DB isolation but the retrieval of
    task was not intuitive and it could lead to flaky tests possibly
    
    (cherry picked from commit f25adf14ad486bac818fe3fdcd61eb3355e8ec9b)
    
    * Skip  database isolation case for task mapping taskinstance tests (#41471)
    
    Related: #41067
    (cherry picked from commit 7718bd7a6ed7fb476e4920315b6d11f1ac465f44)
    
    * Skipping tests for db isolation because similar tests were skipped 
(#41450)
    
    (cherry picked from commit e94b508b946471420488cc466d92301b54b4c5ae)
    
    ---------
    
    Co-authored-by: Jarek Potiuk <[email protected]>
    Co-authored-by: Brent Bovenzi <[email protected]>
    Co-authored-by: M. Olcay Tercanlı <[email protected]>
    Co-authored-by: Howard Yoo <[email protected]>
    Co-authored-by: Jens Scheffler <[email protected]>
    Co-authored-by: Bugra Ozturk <[email protected]>
---
 .github/workflows/ci.yml                           |  2 +-
 .github/workflows/run-unit-tests.yml               |  6 ++
 .github/workflows/special-tests.yml                | 23 ++++++++
 Dockerfile                                         |  2 +-
 Dockerfile.ci                                      | 16 +++---
 airflow/api_internal/endpoints/rpc_api_endpoint.py | 15 ++---
 airflow/api_internal/internal_api_call.py          |  2 +-
 airflow/executors/base_executor.py                 |  2 +-
 airflow/executors/local_executor.py                |  2 +-
 airflow/executors/sequential_executor.py           |  2 +-
 airflow/jobs/scheduler_job_runner.py               |  2 +-
 airflow/models/baseoperator.py                     |  3 +-
 airflow/models/variable.py                         | 66 +++++++++++++++++++++-
 airflow/providers/amazon/provider.yaml             |  2 +-
 airflow/serialization/enums.py                     |  1 +
 airflow/serialization/serialized_objects.py        | 16 +++++-
 airflow/settings.py                                | 51 ++++++++++++-----
 airflow/traces/utils.py                            |  7 +--
 airflow/www/views.py                               |  3 +-
 chart/values.schema.json                           |  2 +-
 chart/values.yaml                                  |  2 +-
 clients/python/CHANGELOG.md                        | 20 +++++++
 clients/python/version.txt                         |  2 +-
 .../airflow_breeze/commands/testing_commands.py    |  2 +
 .../templates/CHANGELOG_TEMPLATE.rst.jinja2        |  6 +-
 dev/breeze/src/airflow_breeze/utils/run_tests.py   | 18 ++++--
 .../src/airflow_breeze/utils/selective_checks.py   |  4 ++
 .../tests/test_pytest_args_for_test_types.py       | 13 +++++
 dev/breeze/tests/test_selective_checks.py          |  5 +-
 generated/provider_dependencies.json               |  2 +-
 pyproject.toml                                     |  3 +
 scripts/docker/entrypoint_ci.sh                    | 12 ++--
 tests/always/test_example_dags.py                  | 20 ++-----
 tests/cli/commands/test_internal_api_command.py    |  3 +
 tests/cli/commands/test_webserver_command.py       |  3 +
 tests/decorators/test_bash.py                      |  3 +
 tests/decorators/test_branch_external_python.py    |  3 +-
 tests/decorators/test_branch_python.py             |  3 +-
 tests/decorators/test_branch_virtualenv.py         |  3 +-
 tests/decorators/test_condition.py                 |  3 +-
 tests/decorators/test_external_python.py           |  2 +-
 tests/decorators/test_python.py                    | 11 +++-
 tests/decorators/test_python_virtualenv.py         |  4 +-
 tests/decorators/test_sensor.py                    |  3 +-
 tests/decorators/test_short_circuit.py             |  2 +-
 tests/models/test_taskinstance.py                  |  9 ++-
 tests/models/test_variable.py                      | 10 +++-
 tests/operators/test_python.py                     |  3 +
 tests/providers/opensearch/conftest.py             | 11 +++-
 .../providers/opensearch/hooks/test_opensearch.py  |  3 +-
 .../opensearch/operators/test_opensearch.py        |  3 +
 tests/sensors/test_external_task_sensor.py         |  2 +
 tests/serialization/test_dag_serialization.py      |  2 +
 tests/www/views/test_views_trigger_dag.py          | 28 +++++++++
 54 files changed, 350 insertions(+), 98 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index a1fa32e3ad..866f8f253d 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -23,7 +23,7 @@ on:  # yamllint disable-line rule:truthy
   push:
     branches: ['v[0-9]+-[0-9]+-test']
   pull_request:
-    branches: ['main']
+    branches: ['main', 'v[0-9]+-[0-9]+-test', 'v[0-9]+-[0-9]+-stable']
   workflow_dispatch:
 permissions:
   # All other permissions are set to none
diff --git a/.github/workflows/run-unit-tests.yml 
b/.github/workflows/run-unit-tests.yml
index 7828e50ed7..2989a952d9 100644
--- a/.github/workflows/run-unit-tests.yml
+++ b/.github/workflows/run-unit-tests.yml
@@ -104,6 +104,11 @@ on:  # yamllint disable-line rule:truthy
         required: false
         default: "true"
         type: string
+      database-isolation:
+        description: "Whether to enable database isolattion or not 
(true/false)"
+        required: false
+        default: "false"
+        type: string
       force-lowest-dependencies:
         description: "Whether to force lowest dependencies for the tests or 
not (true/false)"
         required: false
@@ -152,6 +157,7 @@ jobs:
       PYTHON_MAJOR_MINOR_VERSION: "${{ matrix.python-version }}"
       UPGRADE_BOTO: "${{ inputs.upgrade-boto }}"
       AIRFLOW_MONITOR_DELAY_TIME_IN_SECONDS: 
"${{inputs.monitor-delay-time-in-seconds}}"
+      DATABASE_ISOLATION: "${{ inputs.database-isolation }}"
       VERBOSE: "true"
     steps:
       - name: "Cleanup repo"
diff --git a/.github/workflows/special-tests.yml 
b/.github/workflows/special-tests.yml
index 000b5aa3d9..e09b813acf 100644
--- a/.github/workflows/special-tests.yml
+++ b/.github/workflows/special-tests.yml
@@ -193,6 +193,29 @@ jobs:
       run-coverage: ${{ inputs.run-coverage }}
       debug-resources: ${{ inputs.debug-resources }}
 
+  tests-database-isolation:
+    name: "Database isolation test"
+    uses: ./.github/workflows/run-unit-tests.yml
+    permissions:
+      contents: read
+      packages: read
+    secrets: inherit
+    with:
+      runs-on-as-json-default: ${{ inputs.runs-on-as-json-default }}
+      enable-aip-44: "true"
+      database-isolation: "true"
+      test-name: "DatabaseIsolation-Postgres"
+      test-scope: "DB"
+      backend: "postgres"
+      image-tag: ${{ inputs.image-tag }}
+      python-versions: "['${{ inputs.default-python-version }}']"
+      backend-versions: "['${{ inputs.default-postgres-version }}']"
+      excludes: "[]"
+      parallel-test-types-list-as-string: ${{ 
inputs.parallel-test-types-list-as-string }}
+      include-success-outputs: ${{ 
needs.build-info.outputs.include-success-outputs }}
+      run-coverage: ${{ inputs.run-coverage }}
+      debug-resources: ${{ inputs.debug-resources }}
+
   tests-quarantined:
     name: "Quarantined test"
     uses: ./.github/workflows/run-unit-tests.yml
diff --git a/Dockerfile b/Dockerfile
index 8c4a43274f..5eb2b33556 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -50,7 +50,7 @@ ARG AIRFLOW_VERSION="2.9.3"
 ARG PYTHON_BASE_IMAGE="python:3.8-slim-bookworm"
 
 ARG AIRFLOW_PIP_VERSION=24.2
-ARG AIRFLOW_UV_VERSION=0.2.34
+ARG AIRFLOW_UV_VERSION=0.2.37
 ARG AIRFLOW_USE_UV="false"
 ARG UV_HTTP_TIMEOUT="300"
 ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow";
diff --git a/Dockerfile.ci b/Dockerfile.ci
index 14ccb669f6..f46890cd81 100644
--- a/Dockerfile.ci
+++ b/Dockerfile.ci
@@ -1022,16 +1022,17 @@ function check_boto_upgrade() {
     echo "${COLOR_BLUE}Upgrading boto3, botocore to latest version to run 
Amazon tests with them${COLOR_RESET}"
     echo
     # shellcheck disable=SC2086
-    ${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs 
yandexcloud || true
+    ${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs 
yandexcloud opensearch-py || true
     # We need to include few dependencies to pass pip check with other 
dependencies:
     #   * oss2 as dependency as otherwise jmespath will be bumped (sync with 
alibaba provider)
-    #   * gcloud-aio-auth limit is needed to be included as it bumps 
cryptography (sync with google provider)
+    #   * cryptography is kept for snowflake-connector-python limitation (sync 
with snowflake provider)
     #   * requests needs to be limited to be compatible with apache beam (sync 
with apache-beam provider)
     #   * yandexcloud requirements for requests does not match those of 
apache.beam and latest botocore
     #   Both requests and yandexcloud exclusion above might be removed after
     #   https://github.com/apache/beam/issues/32080 is addressed
-    #   When you remove yandexcloud from the above list, also remove it from 
"test_example_dags.py"
-    #   in "tests/always".
+    #   This is already addressed and planned for 2.59.0 release.
+    #   When you remove yandexcloud and opensearch from the above list, you 
can also remove the
+    #   optional providers_dependencies exclusions from "test_example_dags.py" 
in "tests/always".
     set -x
     # shellcheck disable=SC2086
     ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade boto3 
botocore \
@@ -1068,8 +1069,9 @@ function check_pydantic() {
         echo
         echo "${COLOR_YELLOW}Downgrading Pydantic to < 2${COLOR_RESET}"
         echo
+        # Pydantic 1.10.17/1.10.15 conflicts with aws-sam-translator so we 
need to exclude it
         # shellcheck disable=SC2086
-        ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade 
"pydantic<2.0.0"
+        ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade 
"pydantic<2.0.0,!=1.10.17,!=1.10.15"
         pip check
     else
         echo
@@ -1310,7 +1312,7 @@ ARG DEFAULT_CONSTRAINTS_BRANCH="constraints-main"
 ARG AIRFLOW_CI_BUILD_EPOCH="10"
 ARG AIRFLOW_PRE_CACHED_PIP_PACKAGES="true"
 ARG AIRFLOW_PIP_VERSION=24.2
-ARG AIRFLOW_UV_VERSION=0.2.34
+ARG AIRFLOW_UV_VERSION=0.2.37
 ARG AIRFLOW_USE_UV="true"
 # Setup PIP
 # By default PIP install run without cache to make image smaller
@@ -1334,7 +1336,7 @@ ARG AIRFLOW_VERSION=""
 ARG ADDITIONAL_PIP_INSTALL_FLAGS=""
 
 ARG AIRFLOW_PIP_VERSION=24.2
-ARG AIRFLOW_UV_VERSION=0.2.34
+ARG AIRFLOW_UV_VERSION=0.2.37
 ARG AIRFLOW_USE_UV="true"
 
 ENV AIRFLOW_REPO=${AIRFLOW_REPO}\
diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py 
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index ad65157ef9..c3d8b671fb 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -126,9 +126,9 @@ def initialize_method_map() -> dict[str, Callable]:
         # XCom.get_many, # Not supported because it returns query
         XCom.clear,
         XCom.set,
-        Variable.set,
-        Variable.update,
-        Variable.delete,
+        Variable._set,
+        Variable._update,
+        Variable._delete,
         DAG.fetch_callback,
         DAG.fetch_dagrun,
         DagRun.fetch_task_instances,
@@ -228,19 +228,20 @@ def internal_airflow_api(body: dict[str, Any]) -> 
APIResponse:
     except Exception:
         return log_and_build_error_response(message="Error deserializing 
parameters.", status=400)
 
-    log.info("Calling method %s\nparams: %s", method_name, params)
+    log.debug("Calling method %s\nparams: %s", method_name, params)
     try:
         # Session must be created there as it may be needed by serializer for 
lazy-loaded fields.
         with create_session() as session:
             output = handler(**params, session=session)
             output_json = BaseSerialization.serialize(output, 
use_pydantic_models=True)
             response = json.dumps(output_json) if output_json is not None else 
None
-            log.info("Sending response: %s", response)
+            log.debug("Sending response: %s", response)
             return Response(response=response, headers={"Content-Type": 
"application/json"})
-    except AirflowException as e:  # In case of AirflowException transport the 
exception class back to caller
+    # In case of AirflowException or other selective known types, transport 
the exception class back to caller
+    except (KeyError, AttributeError, AirflowException) as e:
         exception_json = BaseSerialization.serialize(e, 
use_pydantic_models=True)
         response = json.dumps(exception_json)
-        log.info("Sending exception response: %s", response)
+        log.debug("Sending exception response: %s", response)
         return Response(response=response, headers={"Content-Type": 
"application/json"})
     except Exception:
         return log_and_build_error_response(message=f"Error executing method 
'{method_name}'.", status=500)
diff --git a/airflow/api_internal/internal_api_call.py 
b/airflow/api_internal/internal_api_call.py
index fc0945b3c0..8838377877 100644
--- a/airflow/api_internal/internal_api_call.py
+++ b/airflow/api_internal/internal_api_call.py
@@ -159,7 +159,7 @@ def internal_api_call(func: Callable[PS, RT]) -> 
Callable[PS, RT]:
         if result is None or result == b"":
             return None
         result = BaseSerialization.deserialize(json.loads(result), 
use_pydantic_models=True)
-        if isinstance(result, AirflowException):
+        if isinstance(result, (KeyError, AttributeError, AirflowException)):
             raise result
         return result
 
diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index dd0b8a66d2..57568af199 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -467,7 +467,7 @@ class BaseExecutor(LoggingMixin):
                 span.set_attribute("dag_id", key.dag_id)
                 span.set_attribute("run_id", key.run_id)
                 span.set_attribute("task_id", key.task_id)
-                span.set_attribute("try_number", key.try_number - 1)
+                span.set_attribute("try_number", key.try_number)
 
         self.change_state(key, TaskInstanceState.SUCCESS, info)
 
diff --git a/airflow/executors/local_executor.py 
b/airflow/executors/local_executor.py
index afa51b1d86..32bba42082 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -277,7 +277,7 @@ class LocalExecutor(BaseExecutor):
                 span.set_attribute("dag_id", key.dag_id)
                 span.set_attribute("run_id", key.run_id)
                 span.set_attribute("task_id", key.task_id)
-                span.set_attribute("try_number", key.try_number - 1)
+                span.set_attribute("try_number", key.try_number)
                 span.set_attribute("commands_to_run", str(command))
 
             local_worker = LocalWorker(self.executor.result_queue, key=key, 
command=command)
diff --git a/airflow/executors/sequential_executor.py 
b/airflow/executors/sequential_executor.py
index 1b145892eb..5e9542d915 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -76,7 +76,7 @@ class SequentialExecutor(BaseExecutor):
             span.set_attribute("dag_id", key.dag_id)
             span.set_attribute("run_id", key.run_id)
             span.set_attribute("task_id", key.task_id)
-            span.set_attribute("try_number", key.try_number - 1)
+            span.set_attribute("try_number", key.try_number)
             span.set_attribute("commands_to_run", str(self.commands_to_run))
 
     def sync(self) -> None:
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 163bf5b714..ba5f90c68b 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -837,7 +837,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 span.set_attribute("hostname", ti.hostname)
                 span.set_attribute("log_url", ti.log_url)
                 span.set_attribute("operator", str(ti.operator))
-                span.set_attribute("try_number", ti.try_number - 1)
+                span.set_attribute("try_number", ti.try_number)
                 span.set_attribute("executor_state", state)
                 span.set_attribute("job_id", ti.job_id)
                 span.set_attribute("pool", ti.pool)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 7ffa596ec6..f21db0c675 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1516,10 +1516,11 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
                     data_interval=info.data_interval,
                 )
                 ti = TaskInstance(self, run_id=dr.run_id)
+                session.add(ti)
                 ti.dag_run = dr
                 session.add(dr)
                 session.flush()
-
+                session.commit()
             ti.run(
                 mark_success=mark_success,
                 ignore_depends_on_past=ignore_depends_on_past,
diff --git a/airflow/models/variable.py b/airflow/models/variable.py
index 63b71303bc..563cac46e8 100644
--- a/airflow/models/variable.py
+++ b/airflow/models/variable.py
@@ -154,7 +154,6 @@ class Variable(Base, LoggingMixin):
 
     @staticmethod
     @provide_session
-    @internal_api_call
     def set(
         key: str,
         value: Any,
@@ -167,6 +166,35 @@ class Variable(Base, LoggingMixin):
 
         This operation overwrites an existing variable.
 
+        :param key: Variable Key
+        :param value: Value to set for the Variable
+        :param description: Description of the Variable
+        :param serialize_json: Serialize the value to a JSON string
+        :param session: Session
+        """
+        Variable._set(
+            key=key, value=value, description=description, 
serialize_json=serialize_json, session=session
+        )
+        # invalidate key in cache for faster propagation
+        # we cannot save the value set because it's possible that it's 
shadowed by a custom backend
+        # (see call to check_for_write_conflict above)
+        SecretCache.invalidate_variable(key)
+
+    @staticmethod
+    @provide_session
+    @internal_api_call
+    def _set(
+        key: str,
+        value: Any,
+        description: str | None = None,
+        serialize_json: bool = False,
+        session: Session = None,
+    ) -> None:
+        """
+        Set a value for an Airflow Variable with a given Key.
+
+        This operation overwrites an existing variable.
+
         :param key: Variable Key
         :param value: Value to set for the Variable
         :param description: Description of the Variable
@@ -190,7 +218,6 @@ class Variable(Base, LoggingMixin):
 
     @staticmethod
     @provide_session
-    @internal_api_call
     def update(
         key: str,
         value: Any,
@@ -200,6 +227,27 @@ class Variable(Base, LoggingMixin):
         """
         Update a given Airflow Variable with the Provided value.
 
+        :param key: Variable Key
+        :param value: Value to set for the Variable
+        :param serialize_json: Serialize the value to a JSON string
+        :param session: Session
+        """
+        Variable._update(key=key, value=value, serialize_json=serialize_json, 
session=session)
+        # We need to invalidate the cache for internal API cases on the client 
side
+        SecretCache.invalidate_variable(key)
+
+    @staticmethod
+    @provide_session
+    @internal_api_call
+    def _update(
+        key: str,
+        value: Any,
+        serialize_json: bool = False,
+        session: Session = None,
+    ) -> None:
+        """
+        Update a given Airflow Variable with the Provided value.
+
         :param key: Variable Key
         :param value: Value to set for the Variable
         :param serialize_json: Serialize the value to a JSON string
@@ -219,11 +267,23 @@ class Variable(Base, LoggingMixin):
 
     @staticmethod
     @provide_session
-    @internal_api_call
     def delete(key: str, session: Session = None) -> int:
         """
         Delete an Airflow Variable for a given key.
 
+        :param key: Variable Keys
+        """
+        rows = Variable._delete(key=key, session=session)
+        SecretCache.invalidate_variable(key)
+        return rows
+
+    @staticmethod
+    @provide_session
+    @internal_api_call
+    def _delete(key: str, session: Session = None) -> int:
+        """
+        Delete an Airflow Variable for a given key.
+
         :param key: Variable Keys
         """
         rows = session.execute(delete(Variable).where(Variable.key == 
key)).rowcount
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index c69542ef33..1c6a86f180 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -101,7 +101,7 @@ dependencies:
   - botocore>=1.34.90
   - inflection>=0.5.1
   # Allow a wider range of watchtower versions for flexibility among users
-  - watchtower>=3.0.0,<4
+  - watchtower>=3.0.0,!=3.3.0,<4
   - jsonpath_ng>=1.5.3
   - redshift_connector>=2.0.918
   - sqlalchemy_redshift>=0.8.6
diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py
index a5bd5e3646..f216ce7316 100644
--- a/airflow/serialization/enums.py
+++ b/airflow/serialization/enums.py
@@ -46,6 +46,7 @@ class DagAttributeTypes(str, Enum):
     RELATIVEDELTA = "relativedelta"
     BASE_TRIGGER = "base_trigger"
     AIRFLOW_EXC_SER = "airflow_exc_ser"
+    BASE_EXC_SER = "base_exc_ser"
     DICT = "dict"
     SET = "set"
     TUPLE = "tuple"
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index d110271c3d..a3886aa49a 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -692,6 +692,15 @@ class BaseSerialization:
                 ),
                 type_=DAT.AIRFLOW_EXC_SER,
             )
+        elif isinstance(var, (KeyError, AttributeError)):
+            return cls._encode(
+                cls.serialize(
+                    {"exc_cls_name": var.__class__.__name__, "args": 
[var.args], "kwargs": {}},
+                    use_pydantic_models=use_pydantic_models,
+                    strict=strict,
+                ),
+                type_=DAT.BASE_EXC_SER,
+            )
         elif isinstance(var, BaseTrigger):
             return cls._encode(
                 cls.serialize(var.serialize(), 
use_pydantic_models=use_pydantic_models, strict=strict),
@@ -834,13 +843,16 @@ class BaseSerialization:
             return decode_timezone(var)
         elif type_ == DAT.RELATIVEDELTA:
             return decode_relativedelta(var)
-        elif type_ == DAT.AIRFLOW_EXC_SER:
+        elif type_ == DAT.AIRFLOW_EXC_SER or type_ == DAT.BASE_EXC_SER:
             deser = cls.deserialize(var, 
use_pydantic_models=use_pydantic_models)
             exc_cls_name = deser["exc_cls_name"]
             args = deser["args"]
             kwargs = deser["kwargs"]
             del deser
-            exc_cls = import_string(exc_cls_name)
+            if type_ == DAT.AIRFLOW_EXC_SER:
+                exc_cls = import_string(exc_cls_name)
+            else:
+                exc_cls = import_string(f"builtins.{exc_cls_name}")
             return exc_cls(*args, **kwargs)
         elif type_ == DAT.BASE_TRIGGER:
             tr_cls_name, kwargs = cls.deserialize(var, 
use_pydantic_models=use_pydantic_models)
diff --git a/airflow/settings.py b/airflow/settings.py
index 751bb38760..175a63f69d 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -313,6 +313,8 @@ AIRFLOW_TESTS_PATH = os.path.join(AIRFLOW_PATH, "tests")
 AIRFLOW_SETTINGS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "settings.py")
 AIRFLOW_UTILS_SESSION_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", 
"session.py")
 AIRFLOW_MODELS_BASEOPERATOR_PATH = os.path.join(AIRFLOW_PATH, "airflow", 
"models", "baseoperator.py")
+AIRFLOW_MODELS_DAG_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", 
"dag.py")
+AIRFLOW_DB_UTILS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "db.py")
 
 
 class TracebackSessionForTests:
@@ -370,6 +372,9 @@ class TracebackSessionForTests:
         :return: True if the object was created from test code, False 
otherwise.
         """
         self.traceback = traceback.extract_stack()
+        if any(filename.endswith("_pytest/fixtures.py") for filename, _, _, _ 
in self.traceback):
+            # This is a fixture call
+            return True, None
         airflow_frames = [
             tb
             for tb in self.traceback
@@ -378,24 +383,30 @@ class TracebackSessionForTests:
             and not tb.filename == AIRFLOW_UTILS_SESSION_PATH
         ]
         if any(
-            filename.endswith("conftest.py") or 
filename.endswith("tests/test_utils/db.py")
-            for filename, _, _, _ in airflow_frames
+            filename.endswith("conftest.py")
+            or filename.endswith("tests/test_utils/db.py")
+            or (filename.startswith(AIRFLOW_TESTS_PATH) and name in 
("setup_method", "teardown_method"))
+            for filename, _, name, _ in airflow_frames
         ):
             # This is a fixture call or testing utilities
             return True, None
-        if (
-            len(airflow_frames) >= 2
-            and airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH)
-            and airflow_frames[-1].filename == AIRFLOW_MODELS_BASEOPERATOR_PATH
-            and airflow_frames[-1].name == "run"
-        ):
-            # This is baseoperator run method that is called directly from the 
test code and this is
-            # usual pattern where we create a session in the test code to 
create dag_runs for tests.
-            # If `run` code will be run inside a real "airflow" code the stack 
trace would be longer
-            # and it would not be directly called from the test code. Also if 
subsequently any of the
-            # run_task() method called later from the task code will attempt 
to execute any DB
-            # method, the stack trace will be longer and we will catch it as 
"illegal" call.
-            return True, None
+        if len(airflow_frames) >= 2 and 
airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH):
+            # Let's look at what we are calling directly from the test code
+            current_filename, current_method_name = 
airflow_frames[-1].filename, airflow_frames[-1].name
+            if (current_filename, current_method_name) in (
+                (AIRFLOW_MODELS_BASEOPERATOR_PATH, "run"),
+                (AIRFLOW_MODELS_DAG_PATH, "create_dagrun"),
+            ):
+                # This is baseoperator run method that is called directly from 
the test code and this is
+                # usual pattern where we create a session in the test code to 
create dag_runs for tests.
+                # If `run` code will be run inside a real "airflow" code the 
stack trace would be longer
+                # and it would not be directly called from the test code. Also 
if subsequently any of the
+                # run_task() method called later from the task code will 
attempt to execute any DB
+                # method, the stack trace will be longer and we will catch it 
as "illegal" call.
+                return True, None
+            if current_filename == AIRFLOW_DB_UTILS_PATH:
+                # This is a util method called directly from the test code
+                return True, None
         for tb in airflow_frames[::-1]:
             if tb.filename.startswith(AIRFLOW_PATH):
                 if tb.filename.startswith(AIRFLOW_TESTS_PATH):
@@ -407,6 +418,16 @@ class TracebackSessionForTests:
         # The traceback line will be always 3rd (two bottom ones are Airflow)
         return False, self.traceback[-2]
 
+    def get_bind(
+        self,
+        mapper=None,
+        clause=None,
+        bind=None,
+        _sa_skip_events=None,
+        _sa_skip_for_implicit_returning=False,
+    ):
+        pass
+
 
 def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool:
     """Determine whether the database connection URI specifies a relative 
path."""
diff --git a/airflow/traces/utils.py b/airflow/traces/utils.py
index afab2591d5..9932c249f0 100644
--- a/airflow/traces/utils.py
+++ b/airflow/traces/utils.py
@@ -22,7 +22,6 @@ from typing import TYPE_CHECKING
 
 from airflow.traces import NO_TRACE_ID
 from airflow.utils.hashlib_wrapper import md5
-from airflow.utils.state import TaskInstanceState
 
 if TYPE_CHECKING:
     from airflow.models import DagRun, TaskInstance
@@ -75,12 +74,8 @@ def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) 
-> str | int:
 def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int:
     """Generate span id from the task instance."""
     dag_run = ti.dag_run
-    if ti.state == TaskInstanceState.SUCCESS or ti.state == 
TaskInstanceState.FAILED:
-        try_number = ti.try_number - 1
-    else:
-        try_number = ti.try_number
     return _gen_id(
-        [dag_run.dag_id, dag_run.run_id, ti.task_id, str(try_number)],
+        [dag_run.dag_id, dag_run.run_id, ti.task_id, str(ti.try_number)],
         as_int,
         SPAN_ID,
     )
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 236beed451..a485f84ed4 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2163,7 +2163,7 @@ class Airflow(AirflowBaseView):
             .limit(num_recent_confs)
         )
         recent_confs = {
-            run_id: json.dumps(run_conf)
+            run_id: json.dumps(run_conf, cls=utils_json.WebEncoder)
             for run_id, run_conf in ((run.run_id, run.conf) for run in 
recent_runs)
             if isinstance(run_conf, dict) and any(run_conf)
         }
@@ -2198,6 +2198,7 @@ class Airflow(AirflowBaseView):
                         },
                         indent=4,
                         ensure_ascii=False,
+                        cls=utils_json.WebEncoder,
                     )
                 except TypeError:
                     flash("Could not pre-populate conf field due to 
non-JSON-serializable data-types")
diff --git a/chart/values.schema.json b/chart/values.schema.json
index 5f6a3b55d8..4ba434c341 100644
--- a/chart/values.schema.json
+++ b/chart/values.schema.json
@@ -671,7 +671,7 @@
                         "tag": {
                             "description": "The StatsD image tag.",
                             "type": "string",
-                            "default": "v0.26.1"
+                            "default": "v0.27.1"
                         },
                         "pullPolicy": {
                             "description": "The StatsD image pull policy.",
diff --git a/chart/values.yaml b/chart/values.yaml
index 76130e55bb..a28c3c6d54 100644
--- a/chart/values.yaml
+++ b/chart/values.yaml
@@ -105,7 +105,7 @@ images:
     pullPolicy: IfNotPresent
   statsd:
     repository: quay.io/prometheus/statsd-exporter
-    tag: v0.26.1
+    tag: v0.27.1
     pullPolicy: IfNotPresent
   redis:
     repository: redis
diff --git a/clients/python/CHANGELOG.md b/clients/python/CHANGELOG.md
index d56d692d3c..6a254029a4 100644
--- a/clients/python/CHANGELOG.md
+++ b/clients/python/CHANGELOG.md
@@ -17,6 +17,26 @@
  under the License.
  -->
 
+# v2.10.0
+
+## Major changes:
+
+   - Add dag_stats rest api endpoint 
([#41017](https://github.com/apache/airflow/pull/41017))
+   - AIP-64: Add task instance history list endpoint 
([#40988](https://github.com/apache/airflow/pull/40988))
+   - Change DAG Audit log tab to Event Log 
([#40967](https://github.com/apache/airflow/pull/40967))
+   - AIP-64: Add REST API endpoints for TI try level details 
([#40441](https://github.com/apache/airflow/pull/40441))
+   - Make XCom display as react json 
([#40640](https://github.com/apache/airflow/pull/40640))
+   - Replace usages of task context logger with the log table 
([#40867](https://github.com/apache/airflow/pull/40867))
+   - Fix tasks API endpoint when DAG doesn't have `start_date` 
([#40878](https://github.com/apache/airflow/pull/40878))
+   - Add try_number to log table 
([#40739](https://github.com/apache/airflow/pull/40739))
+   - Add executor field to the task instance API 
([#40034](https://github.com/apache/airflow/pull/40034))
+   - Add task documentation to details tab in grid view. 
([#39899](https://github.com/apache/airflow/pull/39899))
+   - Add max_consecutive_failed_dag_runs in API spec 
([#39830](https://github.com/apache/airflow/pull/39830))
+   - Add task failed dependencies to details page. 
([#38449](https://github.com/apache/airflow/pull/38449))
+   - Add dag re-parsing request endpoint 
([#39138](https://github.com/apache/airflow/pull/39138))
+   - Reorder OpenAPI Spec tags alphabetically 
([#38717](https://github.com/apache/airflow/pull/38717))
+
+
 # v2.9.1
 
 ## Major changes:
diff --git a/clients/python/version.txt b/clients/python/version.txt
index dedcc7d433..10c2c0c3d6 100644
--- a/clients/python/version.txt
+++ b/clients/python/version.txt
@@ -1 +1 @@
-2.9.1
+2.10.0
diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py 
b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
index 51ca4bea5d..cef51d9752 100644
--- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py
+++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
@@ -206,6 +206,7 @@ def _run_test(
             helm_test_package=None,
             keep_env_variables=shell_params.keep_env_variables,
             no_db_cleanup=shell_params.no_db_cleanup,
+            database_isolation=shell_params.database_isolation,
         )
     )
     run_cmd.extend(list(extra_pytest_args))
@@ -968,6 +969,7 @@ def helm_tests(
         helm_test_package=helm_test_package,
         keep_env_variables=False,
         no_db_cleanup=False,
+        database_isolation=False,
     )
     cmd = ["docker", "compose", "run", "--service-ports", "--rm", "airflow", 
*pytest_args, *extra_pytest_args]
     result = run_command(cmd, check=False, env=env, 
output_outside_the_group=True)
diff --git 
a/dev/breeze/src/airflow_breeze/templates/CHANGELOG_TEMPLATE.rst.jinja2 
b/dev/breeze/src/airflow_breeze/templates/CHANGELOG_TEMPLATE.rst.jinja2
index b8a966448c..e51939c575 100644
--- a/dev/breeze/src/airflow_breeze/templates/CHANGELOG_TEMPLATE.rst.jinja2
+++ b/dev/breeze/src/airflow_breeze/templates/CHANGELOG_TEMPLATE.rst.jinja2
@@ -40,7 +40,7 @@ Features
 {%- endif %}
 
 
-{%- if classified_changes.fixes %}
+{%- if classified_changes and classified_changes.fixes %}
 
 Bug Fixes
 ~~~~~~~~~
@@ -50,7 +50,7 @@ Bug Fixes
 {%- endif %}
 
 
-{%- if classified_changes.misc %}
+{%- if classified_changes and classified_changes.misc %}
 
 Misc
 ~~~~
@@ -62,7 +62,7 @@ Misc
 
 .. Below changes are excluded from the changelog. Move them to
    appropriate section above if needed. Do not delete the lines(!):
-{%- if classified_changes.other %}
+{%- if classified_changes and classified_changes.other %}
    {%- for other in classified_changes.other %}
    * ``{{ other.message_without_backticks | safe }}``
    {%- endfor %}
diff --git a/dev/breeze/src/airflow_breeze/utils/run_tests.py 
b/dev/breeze/src/airflow_breeze/utils/run_tests.py
index fe099efbc0..73cbb43081 100644
--- a/dev/breeze/src/airflow_breeze/utils/run_tests.py
+++ b/dev/breeze/src/airflow_breeze/utils/run_tests.py
@@ -311,6 +311,7 @@ def generate_args_for_pytest(
     helm_test_package: str | None,
     keep_env_variables: bool,
     no_db_cleanup: bool,
+    database_isolation: bool,
 ):
     result_log_file, warnings_file, coverage_file = test_paths(test_type, 
backend, helm_test_package)
     if skip_db_tests:
@@ -327,12 +328,13 @@ def generate_args_for_pytest(
             helm_test_package=helm_test_package,
             python_version=python_version,
         )
+    max_fail = 50
     args.extend(
         [
             "--verbosity=0",
             "--strict-markers",
             "--durations=100",
-            "--maxfail=50",
+            f"--maxfail={max_fail}",
             "--color=yes",
             f"--junitxml={result_log_file}",
             # timeouts in seconds for individual tests
@@ -374,7 +376,7 @@ def generate_args_for_pytest(
     args.extend(get_excluded_provider_args(python_version))
     if use_xdist:
         args.extend(["-n", str(parallelism) if parallelism else "auto"])
-    # We have to disabke coverage for Python 3.12 because of the issue with 
coverage that takes too long, despite
+    # We have to disable coverage for Python 3.12 because of the issue with 
coverage that takes too long, despite
     # Using experimental support for Python 3.12 PEP 669. The coverage.py is 
not yet fully compatible with the
     # full scope of PEP-669. That will be fully done when 
https://github.com/nedbat/coveragepy/issues/1746 is
     # resolve for now we are disabling coverage for Python 3.12, and it causes 
slower execution and occasional
@@ -417,5 +419,13 @@ def convert_parallel_types_to_folders(
                 python_version=python_version,
             )
         )
-    # leave only folders, strip --pytest-args
-    return [arg for arg in args if arg.startswith("test")]
+    # leave only folders, strip --pytest-args that exclude some folders with 
`-' prefix
+    folders = [arg for arg in args if arg.startswith("test")]
+    # remove specific provider sub-folders if "tests/providers" is already in 
the list
+    # This workarounds pytest issues where it will only run tests from 
specific subfolders
+    # if both parent and child folders are in the list
+    # The issue in Pytest (changed behaviour in Pytest 8.2 is tracked here
+    # https://github.com/pytest-dev/pytest/issues/12605
+    if "tests/providers" in folders:
+        folders = [folder for folder in folders if not 
folder.startswith("tests/providers/")]
+    return folders
diff --git a/dev/breeze/src/airflow_breeze/utils/selective_checks.py 
b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
index 62f5a20abc..224e76c251 100644
--- a/dev/breeze/src/airflow_breeze/utils/selective_checks.py
+++ b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
@@ -861,6 +861,10 @@ class SelectiveChecks:
         if "Providers" in current_test_types:
             current_test_types.remove("Providers")
             current_test_types.update({f"Providers[{provider}]" for provider 
in get_available_packages()})
+        if self.skip_provider_tests:
+            current_test_types = {
+                test_type for test_type in current_test_types if not 
test_type.startswith("Providers")
+            }
         return " ".join(sorted(current_test_types))
 
     @cached_property
diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py 
b/dev/breeze/tests/test_pytest_args_for_test_types.py
index a64dccbd06..fbb3785949 100644
--- a/dev/breeze/tests/test_pytest_args_for_test_types.py
+++ b/dev/breeze/tests/test_pytest_args_for_test_types.py
@@ -329,6 +329,19 @@ def 
test_pytest_args_for_helm_test_types(helm_test_package: str, pytest_args: li
             ],
             True,
         ),
+        (
+            "Core Providers[-amazon,google] Providers[amazon] 
Providers[google]",
+            [
+                "tests/core",
+                "tests/executors",
+                "tests/jobs",
+                "tests/models",
+                "tests/ti_deps",
+                "tests/utils",
+                "tests/providers",
+            ],
+            False,
+        ),
     ],
 )
 def test_folders_for_parallel_test_types(
diff --git a/dev/breeze/tests/test_selective_checks.py 
b/dev/breeze/tests/test_selective_checks.py
index 7c0ca94094..6bee6bbc7e 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -1075,7 +1075,7 @@ def test_full_test_needed_when_scripts_changes(files: 
tuple[str, ...], expected_
         ),
         (
             pytest.param(
-                ("INTHEWILD.md",),
+                ("INTHEWILD.md", "tests/providers/asana.py"),
                 ("full tests needed",),
                 "v2-7-stable",
                 {
@@ -1097,6 +1097,9 @@ def test_full_test_needed_when_scripts_changes(files: 
tuple[str, ...], expected_
                     "parallel-test-types-list-as-string": "API Always 
BranchExternalPython "
                     "BranchPythonVenv CLI Core ExternalPython Operators Other 
PlainAsserts "
                     "PythonVenv Serialization WWW",
+                    "separate-test-types-list-as-string": "API Always 
BranchExternalPython "
+                    "BranchPythonVenv CLI Core ExternalPython Operators Other 
PlainAsserts "
+                    "PythonVenv Serialization WWW",
                     "needs-mypy": "true",
                     "mypy-folders": "['airflow', 'docs', 'dev']",
                 },
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index 58fb34b2ad..1ac11366c6 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -41,7 +41,7 @@
       "jsonpath_ng>=1.5.3",
       "redshift_connector>=2.0.918",
       "sqlalchemy_redshift>=0.8.6",
-      "watchtower>=3.0.0,<4"
+      "watchtower>=3.0.0,!=3.3.0,<4"
     ],
     "devel-deps": [
       "aiobotocore>=2.13.0",
diff --git a/pyproject.toml b/pyproject.toml
index 621a9e48b8..194c4b268c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -405,6 +405,9 @@ combine-as-imports = true
 "tests/providers/google/cloud/operators/vertex_ai/test_generative_model.py" = 
["E402"]
 "tests/providers/google/cloud/triggers/test_vertex_ai.py" = ["E402"]
 "tests/providers/openai/hooks/test_openai.py" = ["E402"]
+"tests/providers/opensearch/conftest.py" = ["E402"]
+"tests/providers/opensearch/hooks/test_opensearch.py" = ["E402"]
+"tests/providers/opensearch/operators/test_opensearch.py" = ["E402"]
 "tests/providers/openai/operators/test_openai.py" = ["E402"]
 "tests/providers/qdrant/hooks/test_qdrant.py" = ["E402"]
 "tests/providers/qdrant/operators/test_qdrant.py" = ["E402"]
diff --git a/scripts/docker/entrypoint_ci.sh b/scripts/docker/entrypoint_ci.sh
index 1ff3ef0cd2..6e9b36507b 100755
--- a/scripts/docker/entrypoint_ci.sh
+++ b/scripts/docker/entrypoint_ci.sh
@@ -242,16 +242,17 @@ function check_boto_upgrade() {
     echo "${COLOR_BLUE}Upgrading boto3, botocore to latest version to run 
Amazon tests with them${COLOR_RESET}"
     echo
     # shellcheck disable=SC2086
-    ${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs 
yandexcloud || true
+    ${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs 
yandexcloud opensearch-py || true
     # We need to include few dependencies to pass pip check with other 
dependencies:
     #   * oss2 as dependency as otherwise jmespath will be bumped (sync with 
alibaba provider)
-    #   * gcloud-aio-auth limit is needed to be included as it bumps 
cryptography (sync with google provider)
+    #   * cryptography is kept for snowflake-connector-python limitation (sync 
with snowflake provider)
     #   * requests needs to be limited to be compatible with apache beam (sync 
with apache-beam provider)
     #   * yandexcloud requirements for requests does not match those of 
apache.beam and latest botocore
     #   Both requests and yandexcloud exclusion above might be removed after
     #   https://github.com/apache/beam/issues/32080 is addressed
-    #   When you remove yandexcloud from the above list, also remove it from 
"test_example_dags.py"
-    #   in "tests/always".
+    #   This is already addressed and planned for 2.59.0 release.
+    #   When you remove yandexcloud and opensearch from the above list, you 
can also remove the
+    #   optional providers_dependencies exclusions from "test_example_dags.py" 
in "tests/always".
     set -x
     # shellcheck disable=SC2086
     ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade boto3 
botocore \
@@ -289,8 +290,9 @@ function check_pydantic() {
         echo
         echo "${COLOR_YELLOW}Downgrading Pydantic to < 2${COLOR_RESET}"
         echo
+        # Pydantic 1.10.17/1.10.15 conflicts with aws-sam-translator so we 
need to exclude it
         # shellcheck disable=SC2086
-        ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade 
"pydantic<2.0.0"
+        ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade 
"pydantic<2.0.0,!=1.10.17,!=1.10.15"
         pip check
     else
         echo
diff --git a/tests/always/test_example_dags.py 
b/tests/always/test_example_dags.py
index 2b5f37631a..b8e3edb99b 100644
--- a/tests/always/test_example_dags.py
+++ b/tests/always/test_example_dags.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import os
+import re
 import sys
 from glob import glob
 from importlib import metadata as importlib_metadata
@@ -39,8 +40,11 @@ OPTIONAL_PROVIDERS_DEPENDENCIES: dict[str, dict[str, str | 
None]] = {
     # Some examples or system tests may depend on additional packages
     # that are not included in certain CI checks.
     # The format of the dictionary is as follows:
-    # key: the prefix of the file to be excluded,
+    # key: the regexp matching the file to be excluded,
     # value: a dictionary containing package distributions with an optional 
version specifier, e.g., >=2.3.4
+    ".*example_bedrock_retrieve_and_generate.py": {"opensearch-py": None},
+    ".*example_opensearch.py": {"opensearch-py": None},
+    r".*example_yandexcloud.*\.py": {"yandexcloud": None},
 }
 IGNORE_AIRFLOW_PROVIDER_DEPRECATION_WARNING: tuple[str, ...] = (
     # Certain examples or system tests may trigger 
AirflowProviderDeprecationWarnings.
@@ -124,13 +128,6 @@ def example_not_excluded_dags(xfail_db_exception: bool = 
False):
         for prefix in PROVIDERS_PREFIXES
         for provider in suspended_providers_folders
     ]
-    temporary_excluded_upgrade_boto_providers_folders = [
-        AIRFLOW_SOURCES_ROOT.joinpath(prefix, provider).as_posix()
-        for prefix in PROVIDERS_PREFIXES
-        # TODO - remove me when https://github.com/apache/beam/issues/32080 is 
addressed
-        #        and we bring back yandex to be run in case of upgrade boto
-        for provider in ["yandex"]
-    ]
     current_python_excluded_providers_folders = [
         AIRFLOW_SOURCES_ROOT.joinpath(prefix, provider).as_posix()
         for prefix in PROVIDERS_PREFIXES
@@ -146,18 +143,13 @@ def example_not_excluded_dags(xfail_db_exception: bool = 
False):
             if candidate.startswith(tuple(suspended_providers_folders)):
                 param_marks.append(pytest.mark.skip(reason="Suspended 
provider"))
 
-            if os.environ.get("UPGRADE_BOTO", "false") == "true" and 
candidate.startswith(
-                tuple(temporary_excluded_upgrade_boto_providers_folders)
-            ):
-                param_marks.append(pytest.mark.skip(reason="Temporary excluded 
upgrade boto provider"))
-
             if 
candidate.startswith(tuple(current_python_excluded_providers_folders)):
                 param_marks.append(
                     pytest.mark.skip(reason=f"Not supported for Python 
{CURRENT_PYTHON_VERSION}")
                 )
 
             for optional, dependencies in 
OPTIONAL_PROVIDERS_DEPENDENCIES.items():
-                if candidate.endswith(optional):
+                if re.match(optional, candidate):
                     for distribution_name, specifier in dependencies.items():
                         result, reason = 
match_optional_dependencies(distribution_name, specifier)
                         if not result:
diff --git a/tests/cli/commands/test_internal_api_command.py 
b/tests/cli/commands/test_internal_api_command.py
index 99992e6266..a1aaf2daca 100644
--- a/tests/cli/commands/test_internal_api_command.py
+++ b/tests/cli/commands/test_internal_api_command.py
@@ -83,6 +83,9 @@ class TestCLIGetNumReadyWorkersRunning:
             assert self.monitor._get_num_ready_workers_running() == 0
 
 
+# Those tests are skipped in isolation mode because they interfere with the 
internal API
+# server already running in the background in the isolation mode.
[email protected]_if_database_isolation_mode
 @pytest.mark.db_test
 @pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled")
 class TestCliInternalAPI(_ComonCLIGunicornTestClass):
diff --git a/tests/cli/commands/test_webserver_command.py 
b/tests/cli/commands/test_webserver_command.py
index 07d95a9e5f..fa2e58af9e 100644
--- a/tests/cli/commands/test_webserver_command.py
+++ b/tests/cli/commands/test_webserver_command.py
@@ -226,6 +226,9 @@ class TestCLIGetNumReadyWorkersRunning:
             assert self.monitor._get_num_ready_workers_running() == 0
 
 
+# Those tests are skipped in isolation mode because they interfere with the 
internal API
+# server already running in the background in the isolation mode.
[email protected]_if_database_isolation_mode
 @pytest.mark.db_test
 class TestCliWebServer(_ComonCLIGunicornTestClass):
     main_process_regexp = r"airflow webserver"
diff --git a/tests/decorators/test_bash.py b/tests/decorators/test_bash.py
index ba8948936e..9fa7999e83 100644
--- a/tests/decorators/test_bash.py
+++ b/tests/decorators/test_bash.py
@@ -33,6 +33,9 @@ from tests.test_utils.db import clear_db_dags, clear_db_runs, 
clear_rendered_ti_
 
 DEFAULT_DATE = timezone.datetime(2023, 1, 1)
 
+# TODO(potiuk) see why this test hangs in DB isolation mode
+pytestmark = pytest.mark.skip_if_database_isolation_mode
+
 
 @pytest.mark.db_test
 class TestBashDecorator:
diff --git a/tests/decorators/test_branch_external_python.py 
b/tests/decorators/test_branch_external_python.py
index d991f22cd5..d2466365be 100644
--- a/tests/decorators/test_branch_external_python.py
+++ b/tests/decorators/test_branch_external_python.py
@@ -24,7 +24,8 @@ import pytest
 from airflow.decorators import task
 from airflow.utils.state import State
 
-pytestmark = pytest.mark.db_test
+# TODO: (potiuk) - AIP-44 - check why this test hangs
+pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
 
 
 class Test_BranchExternalPythonDecoratedOperator:
diff --git a/tests/decorators/test_branch_python.py 
b/tests/decorators/test_branch_python.py
index 58bb216246..3cd95b8d2a 100644
--- a/tests/decorators/test_branch_python.py
+++ b/tests/decorators/test_branch_python.py
@@ -22,7 +22,8 @@ import pytest
 from airflow.decorators import task
 from airflow.utils.state import State
 
-pytestmark = pytest.mark.db_test
+# TODO: (potiuk) - AIP-44 - check why this test hangs
+pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
 
 
 class Test_BranchPythonDecoratedOperator:
diff --git a/tests/decorators/test_branch_virtualenv.py 
b/tests/decorators/test_branch_virtualenv.py
index a5c23de392..6cdfa1ddff 100644
--- a/tests/decorators/test_branch_virtualenv.py
+++ b/tests/decorators/test_branch_virtualenv.py
@@ -22,7 +22,8 @@ import pytest
 from airflow.decorators import task
 from airflow.utils.state import State
 
-pytestmark = pytest.mark.db_test
+# TODO: (potiuk) - AIP-44 - check why this test hangs
+pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
 
 
 class TestBranchPythonVirtualenvDecoratedOperator:
diff --git a/tests/decorators/test_condition.py 
b/tests/decorators/test_condition.py
index 315db6bfe0..28e0f0bf8f 100644
--- a/tests/decorators/test_condition.py
+++ b/tests/decorators/test_condition.py
@@ -28,7 +28,8 @@ if TYPE_CHECKING:
     from airflow.models.taskinstance import TaskInstance
     from airflow.utils.context import Context
 
-pytestmark = pytest.mark.db_test
+# TODO(potiuk) see why this test hangs in DB isolation mode
+pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
 
 
 @pytest.mark.skip_if_database_isolation_mode  # Test is broken in db isolation 
mode
diff --git a/tests/decorators/test_external_python.py 
b/tests/decorators/test_external_python.py
index 5ed5874e3a..0d9a439aa2 100644
--- a/tests/decorators/test_external_python.py
+++ b/tests/decorators/test_external_python.py
@@ -29,7 +29,7 @@ import pytest
 from airflow.decorators import setup, task, teardown
 from airflow.utils import timezone
 
-pytestmark = pytest.mark.db_test
+pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]
 
 
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 9d2b9a14c8..067beff3ab 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -41,7 +41,7 @@ from airflow.utils.types import DagRunType
 from airflow.utils.xcom import XCOM_RETURN_KEY
 from tests.operators.test_python import BasePythonTest
 
-pytestmark = pytest.mark.db_test
+pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]
 
 
 if TYPE_CHECKING:
@@ -281,6 +281,8 @@ class TestAirflowTaskDecorator(BasePythonTest):
                 def add_number(self, num: int) -> int:
                     return self.num + num
 
+    # TODO(potiuk) see why this test hangs in DB isolation mode
+    @pytest.mark.skip_if_database_isolation_mode
     def test_fail_multiple_outputs_key_type(self):
         @task_decorator(multiple_outputs=True)
         def add_number(num: int):
@@ -293,6 +295,8 @@ class TestAirflowTaskDecorator(BasePythonTest):
         with pytest.raises(AirflowException):
             ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
+    # TODO(potiuk) see why this test hangs in DB isolation mode
+    @pytest.mark.skip_if_database_isolation_mode
     def test_fail_multiple_outputs_no_dict(self):
         @task_decorator(multiple_outputs=True)
         def add_number(num: int):
@@ -541,6 +545,8 @@ class TestAirflowTaskDecorator(BasePythonTest):
 
         assert "add_2" in self.dag_non_serialized.task_ids
 
+    # TODO(potiuk) see why this test hangs in DB isolation mode
+    @pytest.mark.skip_if_database_isolation_mode
     def test_dag_task_multiple_outputs(self):
         """Tests dag.task property to generate task with multiple outputs"""
 
@@ -863,6 +869,7 @@ def test_task_decorator_has_wrapped_attr():
     assert decorated_test_func.__wrapped__ is org_test_func, "__wrapped__ attr 
is not the original function"
 
 
[email protected]_serialized_dag(False)
 @pytest.mark.skip_if_database_isolation_mode  # Test is broken in db isolation 
mode
 def test_upstream_exception_produces_none_xcom(dag_maker, session):
     from airflow.exceptions import AirflowSkipException
@@ -900,6 +907,7 @@ def test_upstream_exception_produces_none_xcom(dag_maker, 
session):
     assert result == "'example' None"
 
 
[email protected]_serialized_dag(False)
 @pytest.mark.skip_if_database_isolation_mode  # Test is broken in db isolation 
mode
 @pytest.mark.parametrize("multiple_outputs", [True, False])
 def test_multiple_outputs_produces_none_xcom_when_task_is_skipped(dag_maker, 
session, multiple_outputs):
@@ -958,6 +966,7 @@ def test_no_warnings(reset_logging_config, caplog):
     assert caplog.messages == []
 
 
[email protected]_serialized_dag(False)
 @pytest.mark.skip_if_database_isolation_mode  # Test is broken in db isolation 
mode
 def test_task_decorator_dataset(dag_maker, session):
     from airflow.datasets import Dataset
diff --git a/tests/decorators/test_python_virtualenv.py 
b/tests/decorators/test_python_virtualenv.py
index 554b33ceb9..57a096ef19 100644
--- a/tests/decorators/test_python_virtualenv.py
+++ b/tests/decorators/test_python_virtualenv.py
@@ -30,7 +30,7 @@ from airflow.exceptions import RemovedInAirflow3Warning
 from airflow.utils import timezone
 from airflow.utils.state import TaskInstanceState
 
-pytestmark = pytest.mark.db_test
+pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]
 
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}"
@@ -373,6 +373,8 @@ class TestPythonVirtualenvDecorator:
         assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
+    # TODO(potiuk) see why this test hangs in DB isolation mode
+    @pytest.mark.skip_if_database_isolation_mode
     def test_invalid_annotation(self, dag_maker):
         import uuid
 
diff --git a/tests/decorators/test_sensor.py b/tests/decorators/test_sensor.py
index e970894a38..a283ed871b 100644
--- a/tests/decorators/test_sensor.py
+++ b/tests/decorators/test_sensor.py
@@ -26,7 +26,7 @@ from airflow.models import XCom
 from airflow.sensors.base import PokeReturnValue
 from airflow.utils.state import State
 
-pytestmark = pytest.mark.db_test
+pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]
 
 
 class TestSensorDecorator:
@@ -52,6 +52,7 @@ class TestSensorDecorator:
             sf >> df
 
         dr = dag_maker.create_dagrun()
+
         sf.operator.run(start_date=dr.execution_date, 
end_date=dr.execution_date, ignore_ti_state=True)
         tis = dr.get_task_instances()
         assert len(tis) == 2
diff --git a/tests/decorators/test_short_circuit.py 
b/tests/decorators/test_short_circuit.py
index 1d43de6842..1c8349b6c9 100644
--- a/tests/decorators/test_short_circuit.py
+++ b/tests/decorators/test_short_circuit.py
@@ -24,7 +24,7 @@ from airflow.decorators import task
 from airflow.utils.state import State
 from airflow.utils.trigger_rule import TriggerRule
 
-pytestmark = pytest.mark.db_test
+pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
 
 
 DEFAULT_DATE = datetime(2022, 8, 17)
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index c2993e9ce8..4516686a67 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1443,7 +1443,10 @@ class TestTaskInstance:
     # Parameterized tests to check for the correct firing
     # of the trigger_rule under various circumstances of mapped task
     # Numeric fields are in order:
-    #   successes, skipped, failed, upstream_failed, done,removed
+    #   successes, skipped, failed, upstream_failed, done,remove
+    # Does not work for database isolation mode because there is local test 
monkeypatching of upstream_failed
+    # That never gets propagated to internal_api
+    @pytest.mark.skip_if_database_isolation_mode
     @pytest.mark.parametrize(
         "trigger_rule, upstream_states, flag_upstream_failed, expect_state, 
expect_completed",
         [
@@ -1539,8 +1542,10 @@ class TestTaskInstance:
         monkeypatch.setattr(_UpstreamTIStates, "calculate", lambda *_: 
upstream_states)
         ti = dr.get_task_instance("do_something_else", session=session)
         ti.map_index = 0
+        base_task = ti.task
+
         for map_index in range(1, 5):
-            ti = TaskInstance(dr.task_instances[-1].task, run_id=dr.run_id, 
map_index=map_index)
+            ti = TaskInstance(base_task, run_id=dr.run_id, map_index=map_index)
             session.add(ti)
             ti.dag_run = dr
         session.flush()
diff --git a/tests/models/test_variable.py b/tests/models/test_variable.py
index 3ec2691e5a..6fb6fa15f2 100644
--- a/tests/models/test_variable.py
+++ b/tests/models/test_variable.py
@@ -30,7 +30,7 @@ from airflow.secrets.metastore import MetastoreBackend
 from tests.test_utils import db
 from tests.test_utils.config import conf_vars
 
-pytestmark = pytest.mark.db_test
+pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
 
 
 class TestVariable:
@@ -47,6 +47,7 @@ class TestVariable:
         db.clear_db_variables()
         crypto._fernet = None
 
+    @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode, internal API has other fernet
     @conf_vars({("core", "fernet_key"): "", ("core", "unit_test_mode"): 
"True"})
     def test_variable_no_encryption(self, session):
         """
@@ -60,6 +61,7 @@ class TestVariable:
         # should mask anything. That logic is tested in test_secrets_masker.py
         self.mask_secret.assert_called_once_with("value", "key")
 
+    @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode, internal API has other fernet
     @conf_vars({("core", "fernet_key"): Fernet.generate_key().decode()})
     def test_variable_with_encryption(self, session):
         """
@@ -70,6 +72,7 @@ class TestVariable:
         assert test_var.is_encrypted
         assert test_var.val == "value"
 
+    @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode, internal API has other fernet
     @pytest.mark.parametrize("test_value", ["value", ""])
     def test_var_with_encryption_rotate_fernet_key(self, test_value, session):
         """
@@ -152,6 +155,7 @@ class TestVariable:
         Variable.update(key="test_key", value="value2", session=session)
         assert "value2" == Variable.get("test_key")
 
+    @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode, API server has other ENV
     def test_variable_update_fails_on_non_metastore_variable(self, session):
         with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"):
             with pytest.raises(AttributeError):
@@ -281,6 +285,7 @@ class TestVariable:
         mock_backend.get_variable.assert_called_once()  # second call was not 
made because of cache
         assert first == second
 
+    @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode, internal API has other env
     def test_cache_invalidation_on_set(self, session):
         with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="from_env"):
             a = Variable.get("key")  # value is saved in cache
@@ -316,7 +321,7 @@ def test_masking_only_secret_values(variable_value, 
deserialize_json, expected_m
             val=variable_value,
         )
         session.add(var)
-        session.flush()
+        session.commit()
         # Make sure we re-load it, not just get the cached object back
         session.expunge(var)
         _secrets_masker().patterns = set()
@@ -326,5 +331,4 @@ def test_masking_only_secret_values(variable_value, 
deserialize_json, expected_m
         for expected_masked_value in expected_masked_values:
             assert expected_masked_value in _secrets_masker().patterns
     finally:
-        session.rollback()
         db.clear_db_variables()
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 993d70cad3..f242812751 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -1306,6 +1306,9 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
         "AssertRewritingHook including captured stdout and we need to run "
         "it with `--assert=plain` pytest option and PYTEST_PLAIN_ASSERTS=true 
.",
     )
+    # TODO(potiuk) check if this can be fixed in the future - for now we are 
skipping tests with venv
+    # and airflow context in DB isolation mode as they are passing None as DAG.
+    @pytest.mark.skip_if_database_isolation_mode
     def test_airflow_context(self, serializer):
         def f(
             # basic
diff --git a/tests/providers/opensearch/conftest.py 
b/tests/providers/opensearch/conftest.py
index 47a447188e..934bbd642a 100644
--- a/tests/providers/opensearch/conftest.py
+++ b/tests/providers/opensearch/conftest.py
@@ -19,12 +19,19 @@ from __future__ import annotations
 from typing import Any
 
 import pytest
-from opensearchpy import OpenSearch
 
+from airflow.hooks.base import BaseHook
 from airflow.models import Connection
-from airflow.providers.opensearch.hooks.opensearch import OpenSearchHook
 from airflow.utils import db
 
+try:
+    from opensearchpy import OpenSearch
+
+    from airflow.providers.opensearch.hooks.opensearch import OpenSearchHook
+except ImportError:
+    OpenSearch = None  # type: ignore[assignment, misc]
+    OpenSearchHook = BaseHook  # type: ignore[assignment,misc]
+
 # TODO: FIXME - those Mocks have overrides that are not used but they also do 
not make Mypy Happy
 # mypy: disable-error-code="override"
 
diff --git a/tests/providers/opensearch/hooks/test_opensearch.py 
b/tests/providers/opensearch/hooks/test_opensearch.py
index 84360ae73f..43075e8532 100644
--- a/tests/providers/opensearch/hooks/test_opensearch.py
+++ b/tests/providers/opensearch/hooks/test_opensearch.py
@@ -18,8 +18,9 @@ from __future__ import annotations
 
 from unittest import mock
 
-import opensearchpy
 import pytest
+
+opensearchpy = pytest.importorskip("opensearchpy")
 from opensearchpy import Urllib3HttpConnection
 
 from airflow.exceptions import AirflowException
diff --git a/tests/providers/opensearch/operators/test_opensearch.py 
b/tests/providers/opensearch/operators/test_opensearch.py
index 706112fef6..63ad7eafe4 100644
--- a/tests/providers/opensearch/operators/test_opensearch.py
+++ b/tests/providers/opensearch/operators/test_opensearch.py
@@ -17,6 +17,9 @@
 from __future__ import annotations
 
 import pytest
+
+opensearchpy = pytest.importorskip("opensearchpy")
+
 from opensearchpy import Document, Keyword, Text
 
 from airflow.models import DAG
diff --git a/tests/sensors/test_external_task_sensor.py 
b/tests/sensors/test_external_task_sensor.py
index fbebd3d120..e7a5991963 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -809,6 +809,7 @@ exit 0
                 dag=self.dag,
             )
 
+    @pytest.mark.skip_if_database_isolation_mode  # Test is broken in db 
isolation mode
     def test_external_task_sensor_waits_for_task_check_existence(self):
         op = ExternalTaskSensor(
             task_id="test_external_task_sensor_check",
@@ -821,6 +822,7 @@ exit 0
         with pytest.raises(AirflowException):
             op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
+    @pytest.mark.skip_if_database_isolation_mode  # Test is broken in db 
isolation mode
     def test_external_task_sensor_waits_for_dag_check_existence(self):
         op = ExternalTaskSensor(
             task_id="test_external_task_sensor_check",
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index e9c8ceaf03..d1c6787db3 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -399,6 +399,8 @@ def timetable_plugin(monkeypatch):
     )
 
 
+# TODO: (potiuk) - AIP-44 - check why this test hangs
[email protected]_if_database_isolation_mode
 class TestStringifiedDAGs:
     """Unit tests for stringified DAGs."""
 
diff --git a/tests/www/views/test_views_trigger_dag.py 
b/tests/www/views/test_views_trigger_dag.py
index c53213c3e6..9b2b297198 100644
--- a/tests/www/views/test_views_trigger_dag.py
+++ b/tests/www/views/test_views_trigger_dag.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 import datetime
 import json
+from decimal import Decimal
 from urllib.parse import quote
 
 import pytest
@@ -28,6 +29,7 @@ from airflow.models.param import Param
 from airflow.operators.empty import EmptyOperator
 from airflow.security import permissions
 from airflow.utils import timezone
+from airflow.utils.json import WebEncoder
 from airflow.utils.session import create_session
 from airflow.utils.types import DagRunType
 from tests.test_utils.api_connexion_utils import create_test_client
@@ -92,6 +94,32 @@ def test_trigger_dag_conf(admin_client):
     assert run.conf == conf_dict
 
 
+def test_trigger_dag_conf_serializable_fields(admin_client):
+    test_dag_id = "example_bash_operator"
+    time_now = timezone.utcnow()
+    conf_dict = {
+        "string": "Hello, World!",
+        "date_str": "2024-08-08T09:57:35.300858",
+        "datetime": time_now,
+        "decimal": Decimal(10.465),
+    }
+    expected_conf = {
+        "string": "Hello, World!",
+        "date_str": "2024-08-08T09:57:35.300858",
+        "datetime": time_now.isoformat(),
+        "decimal": 10.465,
+    }
+
+    admin_client.post(f"dags/{test_dag_id}/trigger", data={"conf": 
json.dumps(conf_dict, cls=WebEncoder)})
+
+    with create_session() as session:
+        run = session.query(DagRun).filter(DagRun.dag_id == 
test_dag_id).first()
+    assert run is not None
+    assert DagRunType.MANUAL in run.run_id
+    assert run.run_type == DagRunType.MANUAL
+    assert run.conf == expected_conf
+
+
 def test_trigger_dag_conf_malformed(admin_client):
     test_dag_id = "example_bash_operator"
 

Reply via email to