This is an automated email from the ASF dual-hosted git repository.
utkarsharma pushed a commit to branch v2-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-stable by this push:
new 91f8265b68 Sync v2-10-stable with v2-10-test to release python client
v2.10.0 (#41610)
91f8265b68 is described below
commit 91f8265b6820eca747634dce39ae0d70aada2640
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Thu Aug 22 14:30:09 2024 +0530
Sync v2-10-stable with v2-10-test to release python client v2.10.0 (#41610)
* Enable pull requests to be run from v*test branches (#41474) (#41476)
Since we switch from direct push of cherry-picking to open PRs
against v*test branch, we should enable PRs to run for the target
branch.
(cherry picked from commit a9363e6a30d73a647ed7d45c92d46d1f6f98513f)
* Prevent provider lowest-dependency tests to run in non-main branch
(#41478) (#41481)
When running tests in v2-10-test branch, lowest depenency tests
are run for providers - because when calculating separate tests,
the "skip_provider_tests" has not been used to filter them out.
This PR fixes it.
(cherry picked from commit 75da5074969ec874040ea094d5afe00b7f02be76)
* Make PROD image building works in non-main PRs (#41480) (#41484)
The PROD image building fails currently in non-main because it
attempts to build source provider packages rather than use them from
PyPi when PR is run against "v-test" branch.
This PR fixes it:
* PROD images in non-main-targetted build will pull providers from
PyPI rather than build them
* they use PyPI constraints to install the providers
* they use UV - which should speed up building of the images
(cherry picked from commit 4d5f1c42a7873329b1b6b8b9b39db2c3033b46df)
* Add WebEncoder for trigger page rendering to avoid render failure
(#41350) (#41485)
Co-authored-by: M. Olcay Tercanlı <[email protected]>
* Incorrect try number subtraction producing invalid span id for OTEL
airflow (issue #41501) (#41502) (#41535)
* Fix for issue #39336
* removed unnecessary import
(cherry picked from commit dd3c3a7a43102c967d76cdcfe1f2f8ebeef4e212)
Co-authored-by: Howard Yoo <[email protected]>
* Fix failing pydantic v1 tests (#41534) (#41541)
We need to exclude some versions of Pydantic v1 because it conflicts
with aws provider.
(cherry picked from commit a033c5f15a033c751419506ea77ffdbacdd37705)
* Fix Non-DB test calculation for main builds (#41499) (#41543)
Pytest has a weird behaviour that it will not collect tests
from parent folder when subfolder of it is specified after the
parent folder. This caused some non-db tests from providers folder
have been skipped during main build.
The issue in Pytest 8.2 (used to work before) is tracked at
https://github.com/pytest-dev/pytest/issues/12605
(cherry picked from commit d48982692c54d024d7c05e1efb7cd2adeb7d896c)
* Add changelog for airflow python client 2.10.0 (#41583) (#41584)
* Add changelog for airflow python client 2.10.0
* Update client version
(cherry picked from commit 317a28ed435960e7184e357a2f128806c34612fa)
* Make all test pass in Database Isolation mode (#41567)
This adds dedicated "DatabaseIsolation" test to airflow v2-10-test
branch..
The DatabaseIsolation test will run all "db-tests" with enabled
DB isolation mode and running `internal-api` component - groups
of tests marked with "skip-if-database-isolation" will be skipped.
* Upgrade build and chart dependencies (#41570) (#41588)
(cherry picked from commit c88192c466cb91842310f82a61eaa48b39439bef)
Co-authored-by: Jarek Potiuk <[email protected]>
* Limit watchtower as depenendcy as 3.3.0 breaks moin. (#41612)
(cherry picked from commit 1b602d50266184d118db52a674baeab29b1f5688)
* Enable running Pull Requests against v2-10-stable branch (#41624)
(cherry picked from commit e306e7f7bc1ef12aeab0fc09e018accda3684a2f)
* Fix tests/models/test_variable.py for database isolation mode (#41414)
* Fix tests/models/test_variable.py for database isolation mode
* Review feedback
(cherry picked from commit 736ebfe3fe2bd67406d5a50dacbfa1e43767d4ce)
* Make latest botocore tests green (#41626)
The latest botocore tests are conflicting with a few requirements
and until apache-beam upcoming version is released we need to do
some manual exclusions. Those exclusions should make latest botocore
test green again.
(cherry picked from commit a13ccbbdec8e59f30218f604fca8cbb999fcb757)
* Simpler task retrieval for taskinstance test (#41389)
The test has been updated for DB isolation but the retrieval of
task was not intuitive and it could lead to flaky tests possibly
(cherry picked from commit f25adf14ad486bac818fe3fdcd61eb3355e8ec9b)
* Skip database isolation case for task mapping taskinstance tests (#41471)
Related: #41067
(cherry picked from commit 7718bd7a6ed7fb476e4920315b6d11f1ac465f44)
* Skipping tests for db isolation because similar tests were skipped
(#41450)
(cherry picked from commit e94b508b946471420488cc466d92301b54b4c5ae)
---------
Co-authored-by: Jarek Potiuk <[email protected]>
Co-authored-by: Brent Bovenzi <[email protected]>
Co-authored-by: M. Olcay Tercanlı <[email protected]>
Co-authored-by: Howard Yoo <[email protected]>
Co-authored-by: Jens Scheffler <[email protected]>
Co-authored-by: Bugra Ozturk <[email protected]>
---
.github/workflows/ci.yml | 2 +-
.github/workflows/run-unit-tests.yml | 6 ++
.github/workflows/special-tests.yml | 23 ++++++++
Dockerfile | 2 +-
Dockerfile.ci | 16 +++---
airflow/api_internal/endpoints/rpc_api_endpoint.py | 15 ++---
airflow/api_internal/internal_api_call.py | 2 +-
airflow/executors/base_executor.py | 2 +-
airflow/executors/local_executor.py | 2 +-
airflow/executors/sequential_executor.py | 2 +-
airflow/jobs/scheduler_job_runner.py | 2 +-
airflow/models/baseoperator.py | 3 +-
airflow/models/variable.py | 66 +++++++++++++++++++++-
airflow/providers/amazon/provider.yaml | 2 +-
airflow/serialization/enums.py | 1 +
airflow/serialization/serialized_objects.py | 16 +++++-
airflow/settings.py | 51 ++++++++++++-----
airflow/traces/utils.py | 7 +--
airflow/www/views.py | 3 +-
chart/values.schema.json | 2 +-
chart/values.yaml | 2 +-
clients/python/CHANGELOG.md | 20 +++++++
clients/python/version.txt | 2 +-
.../airflow_breeze/commands/testing_commands.py | 2 +
.../templates/CHANGELOG_TEMPLATE.rst.jinja2 | 6 +-
dev/breeze/src/airflow_breeze/utils/run_tests.py | 18 ++++--
.../src/airflow_breeze/utils/selective_checks.py | 4 ++
.../tests/test_pytest_args_for_test_types.py | 13 +++++
dev/breeze/tests/test_selective_checks.py | 5 +-
generated/provider_dependencies.json | 2 +-
pyproject.toml | 3 +
scripts/docker/entrypoint_ci.sh | 12 ++--
tests/always/test_example_dags.py | 20 ++-----
tests/cli/commands/test_internal_api_command.py | 3 +
tests/cli/commands/test_webserver_command.py | 3 +
tests/decorators/test_bash.py | 3 +
tests/decorators/test_branch_external_python.py | 3 +-
tests/decorators/test_branch_python.py | 3 +-
tests/decorators/test_branch_virtualenv.py | 3 +-
tests/decorators/test_condition.py | 3 +-
tests/decorators/test_external_python.py | 2 +-
tests/decorators/test_python.py | 11 +++-
tests/decorators/test_python_virtualenv.py | 4 +-
tests/decorators/test_sensor.py | 3 +-
tests/decorators/test_short_circuit.py | 2 +-
tests/models/test_taskinstance.py | 9 ++-
tests/models/test_variable.py | 10 +++-
tests/operators/test_python.py | 3 +
tests/providers/opensearch/conftest.py | 11 +++-
.../providers/opensearch/hooks/test_opensearch.py | 3 +-
.../opensearch/operators/test_opensearch.py | 3 +
tests/sensors/test_external_task_sensor.py | 2 +
tests/serialization/test_dag_serialization.py | 2 +
tests/www/views/test_views_trigger_dag.py | 28 +++++++++
54 files changed, 350 insertions(+), 98 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index a1fa32e3ad..866f8f253d 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -23,7 +23,7 @@ on: # yamllint disable-line rule:truthy
push:
branches: ['v[0-9]+-[0-9]+-test']
pull_request:
- branches: ['main']
+ branches: ['main', 'v[0-9]+-[0-9]+-test', 'v[0-9]+-[0-9]+-stable']
workflow_dispatch:
permissions:
# All other permissions are set to none
diff --git a/.github/workflows/run-unit-tests.yml
b/.github/workflows/run-unit-tests.yml
index 7828e50ed7..2989a952d9 100644
--- a/.github/workflows/run-unit-tests.yml
+++ b/.github/workflows/run-unit-tests.yml
@@ -104,6 +104,11 @@ on: # yamllint disable-line rule:truthy
required: false
default: "true"
type: string
+ database-isolation:
+ description: "Whether to enable database isolattion or not
(true/false)"
+ required: false
+ default: "false"
+ type: string
force-lowest-dependencies:
description: "Whether to force lowest dependencies for the tests or
not (true/false)"
required: false
@@ -152,6 +157,7 @@ jobs:
PYTHON_MAJOR_MINOR_VERSION: "${{ matrix.python-version }}"
UPGRADE_BOTO: "${{ inputs.upgrade-boto }}"
AIRFLOW_MONITOR_DELAY_TIME_IN_SECONDS:
"${{inputs.monitor-delay-time-in-seconds}}"
+ DATABASE_ISOLATION: "${{ inputs.database-isolation }}"
VERBOSE: "true"
steps:
- name: "Cleanup repo"
diff --git a/.github/workflows/special-tests.yml
b/.github/workflows/special-tests.yml
index 000b5aa3d9..e09b813acf 100644
--- a/.github/workflows/special-tests.yml
+++ b/.github/workflows/special-tests.yml
@@ -193,6 +193,29 @@ jobs:
run-coverage: ${{ inputs.run-coverage }}
debug-resources: ${{ inputs.debug-resources }}
+ tests-database-isolation:
+ name: "Database isolation test"
+ uses: ./.github/workflows/run-unit-tests.yml
+ permissions:
+ contents: read
+ packages: read
+ secrets: inherit
+ with:
+ runs-on-as-json-default: ${{ inputs.runs-on-as-json-default }}
+ enable-aip-44: "true"
+ database-isolation: "true"
+ test-name: "DatabaseIsolation-Postgres"
+ test-scope: "DB"
+ backend: "postgres"
+ image-tag: ${{ inputs.image-tag }}
+ python-versions: "['${{ inputs.default-python-version }}']"
+ backend-versions: "['${{ inputs.default-postgres-version }}']"
+ excludes: "[]"
+ parallel-test-types-list-as-string: ${{
inputs.parallel-test-types-list-as-string }}
+ include-success-outputs: ${{
needs.build-info.outputs.include-success-outputs }}
+ run-coverage: ${{ inputs.run-coverage }}
+ debug-resources: ${{ inputs.debug-resources }}
+
tests-quarantined:
name: "Quarantined test"
uses: ./.github/workflows/run-unit-tests.yml
diff --git a/Dockerfile b/Dockerfile
index 8c4a43274f..5eb2b33556 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -50,7 +50,7 @@ ARG AIRFLOW_VERSION="2.9.3"
ARG PYTHON_BASE_IMAGE="python:3.8-slim-bookworm"
ARG AIRFLOW_PIP_VERSION=24.2
-ARG AIRFLOW_UV_VERSION=0.2.34
+ARG AIRFLOW_UV_VERSION=0.2.37
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
diff --git a/Dockerfile.ci b/Dockerfile.ci
index 14ccb669f6..f46890cd81 100644
--- a/Dockerfile.ci
+++ b/Dockerfile.ci
@@ -1022,16 +1022,17 @@ function check_boto_upgrade() {
echo "${COLOR_BLUE}Upgrading boto3, botocore to latest version to run
Amazon tests with them${COLOR_RESET}"
echo
# shellcheck disable=SC2086
- ${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs
yandexcloud || true
+ ${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs
yandexcloud opensearch-py || true
# We need to include few dependencies to pass pip check with other
dependencies:
# * oss2 as dependency as otherwise jmespath will be bumped (sync with
alibaba provider)
- # * gcloud-aio-auth limit is needed to be included as it bumps
cryptography (sync with google provider)
+ # * cryptography is kept for snowflake-connector-python limitation (sync
with snowflake provider)
# * requests needs to be limited to be compatible with apache beam (sync
with apache-beam provider)
# * yandexcloud requirements for requests does not match those of
apache.beam and latest botocore
# Both requests and yandexcloud exclusion above might be removed after
# https://github.com/apache/beam/issues/32080 is addressed
- # When you remove yandexcloud from the above list, also remove it from
"test_example_dags.py"
- # in "tests/always".
+ # This is already addressed and planned for 2.59.0 release.
+ # When you remove yandexcloud and opensearch from the above list, you
can also remove the
+ # optional providers_dependencies exclusions from "test_example_dags.py"
in "tests/always".
set -x
# shellcheck disable=SC2086
${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade boto3
botocore \
@@ -1068,8 +1069,9 @@ function check_pydantic() {
echo
echo "${COLOR_YELLOW}Downgrading Pydantic to < 2${COLOR_RESET}"
echo
+ # Pydantic 1.10.17/1.10.15 conflicts with aws-sam-translator so we
need to exclude it
# shellcheck disable=SC2086
- ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade
"pydantic<2.0.0"
+ ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade
"pydantic<2.0.0,!=1.10.17,!=1.10.15"
pip check
else
echo
@@ -1310,7 +1312,7 @@ ARG DEFAULT_CONSTRAINTS_BRANCH="constraints-main"
ARG AIRFLOW_CI_BUILD_EPOCH="10"
ARG AIRFLOW_PRE_CACHED_PIP_PACKAGES="true"
ARG AIRFLOW_PIP_VERSION=24.2
-ARG AIRFLOW_UV_VERSION=0.2.34
+ARG AIRFLOW_UV_VERSION=0.2.37
ARG AIRFLOW_USE_UV="true"
# Setup PIP
# By default PIP install run without cache to make image smaller
@@ -1334,7 +1336,7 @@ ARG AIRFLOW_VERSION=""
ARG ADDITIONAL_PIP_INSTALL_FLAGS=""
ARG AIRFLOW_PIP_VERSION=24.2
-ARG AIRFLOW_UV_VERSION=0.2.34
+ARG AIRFLOW_UV_VERSION=0.2.37
ARG AIRFLOW_USE_UV="true"
ENV AIRFLOW_REPO=${AIRFLOW_REPO}\
diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index ad65157ef9..c3d8b671fb 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -126,9 +126,9 @@ def initialize_method_map() -> dict[str, Callable]:
# XCom.get_many, # Not supported because it returns query
XCom.clear,
XCom.set,
- Variable.set,
- Variable.update,
- Variable.delete,
+ Variable._set,
+ Variable._update,
+ Variable._delete,
DAG.fetch_callback,
DAG.fetch_dagrun,
DagRun.fetch_task_instances,
@@ -228,19 +228,20 @@ def internal_airflow_api(body: dict[str, Any]) ->
APIResponse:
except Exception:
return log_and_build_error_response(message="Error deserializing
parameters.", status=400)
- log.info("Calling method %s\nparams: %s", method_name, params)
+ log.debug("Calling method %s\nparams: %s", method_name, params)
try:
# Session must be created there as it may be needed by serializer for
lazy-loaded fields.
with create_session() as session:
output = handler(**params, session=session)
output_json = BaseSerialization.serialize(output,
use_pydantic_models=True)
response = json.dumps(output_json) if output_json is not None else
None
- log.info("Sending response: %s", response)
+ log.debug("Sending response: %s", response)
return Response(response=response, headers={"Content-Type":
"application/json"})
- except AirflowException as e: # In case of AirflowException transport the
exception class back to caller
+ # In case of AirflowException or other selective known types, transport
the exception class back to caller
+ except (KeyError, AttributeError, AirflowException) as e:
exception_json = BaseSerialization.serialize(e,
use_pydantic_models=True)
response = json.dumps(exception_json)
- log.info("Sending exception response: %s", response)
+ log.debug("Sending exception response: %s", response)
return Response(response=response, headers={"Content-Type":
"application/json"})
except Exception:
return log_and_build_error_response(message=f"Error executing method
'{method_name}'.", status=500)
diff --git a/airflow/api_internal/internal_api_call.py
b/airflow/api_internal/internal_api_call.py
index fc0945b3c0..8838377877 100644
--- a/airflow/api_internal/internal_api_call.py
+++ b/airflow/api_internal/internal_api_call.py
@@ -159,7 +159,7 @@ def internal_api_call(func: Callable[PS, RT]) ->
Callable[PS, RT]:
if result is None or result == b"":
return None
result = BaseSerialization.deserialize(json.loads(result),
use_pydantic_models=True)
- if isinstance(result, AirflowException):
+ if isinstance(result, (KeyError, AttributeError, AirflowException)):
raise result
return result
diff --git a/airflow/executors/base_executor.py
b/airflow/executors/base_executor.py
index dd0b8a66d2..57568af199 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -467,7 +467,7 @@ class BaseExecutor(LoggingMixin):
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
- span.set_attribute("try_number", key.try_number - 1)
+ span.set_attribute("try_number", key.try_number)
self.change_state(key, TaskInstanceState.SUCCESS, info)
diff --git a/airflow/executors/local_executor.py
b/airflow/executors/local_executor.py
index afa51b1d86..32bba42082 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -277,7 +277,7 @@ class LocalExecutor(BaseExecutor):
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
- span.set_attribute("try_number", key.try_number - 1)
+ span.set_attribute("try_number", key.try_number)
span.set_attribute("commands_to_run", str(command))
local_worker = LocalWorker(self.executor.result_queue, key=key,
command=command)
diff --git a/airflow/executors/sequential_executor.py
b/airflow/executors/sequential_executor.py
index 1b145892eb..5e9542d915 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -76,7 +76,7 @@ class SequentialExecutor(BaseExecutor):
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
- span.set_attribute("try_number", key.try_number - 1)
+ span.set_attribute("try_number", key.try_number)
span.set_attribute("commands_to_run", str(self.commands_to_run))
def sync(self) -> None:
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 163bf5b714..ba5f90c68b 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -837,7 +837,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
span.set_attribute("hostname", ti.hostname)
span.set_attribute("log_url", ti.log_url)
span.set_attribute("operator", str(ti.operator))
- span.set_attribute("try_number", ti.try_number - 1)
+ span.set_attribute("try_number", ti.try_number)
span.set_attribute("executor_state", state)
span.set_attribute("job_id", ti.job_id)
span.set_attribute("pool", ti.pool)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 7ffa596ec6..f21db0c675 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1516,10 +1516,11 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
data_interval=info.data_interval,
)
ti = TaskInstance(self, run_id=dr.run_id)
+ session.add(ti)
ti.dag_run = dr
session.add(dr)
session.flush()
-
+ session.commit()
ti.run(
mark_success=mark_success,
ignore_depends_on_past=ignore_depends_on_past,
diff --git a/airflow/models/variable.py b/airflow/models/variable.py
index 63b71303bc..563cac46e8 100644
--- a/airflow/models/variable.py
+++ b/airflow/models/variable.py
@@ -154,7 +154,6 @@ class Variable(Base, LoggingMixin):
@staticmethod
@provide_session
- @internal_api_call
def set(
key: str,
value: Any,
@@ -167,6 +166,35 @@ class Variable(Base, LoggingMixin):
This operation overwrites an existing variable.
+ :param key: Variable Key
+ :param value: Value to set for the Variable
+ :param description: Description of the Variable
+ :param serialize_json: Serialize the value to a JSON string
+ :param session: Session
+ """
+ Variable._set(
+ key=key, value=value, description=description,
serialize_json=serialize_json, session=session
+ )
+ # invalidate key in cache for faster propagation
+ # we cannot save the value set because it's possible that it's
shadowed by a custom backend
+ # (see call to check_for_write_conflict above)
+ SecretCache.invalidate_variable(key)
+
+ @staticmethod
+ @provide_session
+ @internal_api_call
+ def _set(
+ key: str,
+ value: Any,
+ description: str | None = None,
+ serialize_json: bool = False,
+ session: Session = None,
+ ) -> None:
+ """
+ Set a value for an Airflow Variable with a given Key.
+
+ This operation overwrites an existing variable.
+
:param key: Variable Key
:param value: Value to set for the Variable
:param description: Description of the Variable
@@ -190,7 +218,6 @@ class Variable(Base, LoggingMixin):
@staticmethod
@provide_session
- @internal_api_call
def update(
key: str,
value: Any,
@@ -200,6 +227,27 @@ class Variable(Base, LoggingMixin):
"""
Update a given Airflow Variable with the Provided value.
+ :param key: Variable Key
+ :param value: Value to set for the Variable
+ :param serialize_json: Serialize the value to a JSON string
+ :param session: Session
+ """
+ Variable._update(key=key, value=value, serialize_json=serialize_json,
session=session)
+ # We need to invalidate the cache for internal API cases on the client
side
+ SecretCache.invalidate_variable(key)
+
+ @staticmethod
+ @provide_session
+ @internal_api_call
+ def _update(
+ key: str,
+ value: Any,
+ serialize_json: bool = False,
+ session: Session = None,
+ ) -> None:
+ """
+ Update a given Airflow Variable with the Provided value.
+
:param key: Variable Key
:param value: Value to set for the Variable
:param serialize_json: Serialize the value to a JSON string
@@ -219,11 +267,23 @@ class Variable(Base, LoggingMixin):
@staticmethod
@provide_session
- @internal_api_call
def delete(key: str, session: Session = None) -> int:
"""
Delete an Airflow Variable for a given key.
+ :param key: Variable Keys
+ """
+ rows = Variable._delete(key=key, session=session)
+ SecretCache.invalidate_variable(key)
+ return rows
+
+ @staticmethod
+ @provide_session
+ @internal_api_call
+ def _delete(key: str, session: Session = None) -> int:
+ """
+ Delete an Airflow Variable for a given key.
+
:param key: Variable Keys
"""
rows = session.execute(delete(Variable).where(Variable.key ==
key)).rowcount
diff --git a/airflow/providers/amazon/provider.yaml
b/airflow/providers/amazon/provider.yaml
index c69542ef33..1c6a86f180 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -101,7 +101,7 @@ dependencies:
- botocore>=1.34.90
- inflection>=0.5.1
# Allow a wider range of watchtower versions for flexibility among users
- - watchtower>=3.0.0,<4
+ - watchtower>=3.0.0,!=3.3.0,<4
- jsonpath_ng>=1.5.3
- redshift_connector>=2.0.918
- sqlalchemy_redshift>=0.8.6
diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py
index a5bd5e3646..f216ce7316 100644
--- a/airflow/serialization/enums.py
+++ b/airflow/serialization/enums.py
@@ -46,6 +46,7 @@ class DagAttributeTypes(str, Enum):
RELATIVEDELTA = "relativedelta"
BASE_TRIGGER = "base_trigger"
AIRFLOW_EXC_SER = "airflow_exc_ser"
+ BASE_EXC_SER = "base_exc_ser"
DICT = "dict"
SET = "set"
TUPLE = "tuple"
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index d110271c3d..a3886aa49a 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -692,6 +692,15 @@ class BaseSerialization:
),
type_=DAT.AIRFLOW_EXC_SER,
)
+ elif isinstance(var, (KeyError, AttributeError)):
+ return cls._encode(
+ cls.serialize(
+ {"exc_cls_name": var.__class__.__name__, "args":
[var.args], "kwargs": {}},
+ use_pydantic_models=use_pydantic_models,
+ strict=strict,
+ ),
+ type_=DAT.BASE_EXC_SER,
+ )
elif isinstance(var, BaseTrigger):
return cls._encode(
cls.serialize(var.serialize(),
use_pydantic_models=use_pydantic_models, strict=strict),
@@ -834,13 +843,16 @@ class BaseSerialization:
return decode_timezone(var)
elif type_ == DAT.RELATIVEDELTA:
return decode_relativedelta(var)
- elif type_ == DAT.AIRFLOW_EXC_SER:
+ elif type_ == DAT.AIRFLOW_EXC_SER or type_ == DAT.BASE_EXC_SER:
deser = cls.deserialize(var,
use_pydantic_models=use_pydantic_models)
exc_cls_name = deser["exc_cls_name"]
args = deser["args"]
kwargs = deser["kwargs"]
del deser
- exc_cls = import_string(exc_cls_name)
+ if type_ == DAT.AIRFLOW_EXC_SER:
+ exc_cls = import_string(exc_cls_name)
+ else:
+ exc_cls = import_string(f"builtins.{exc_cls_name}")
return exc_cls(*args, **kwargs)
elif type_ == DAT.BASE_TRIGGER:
tr_cls_name, kwargs = cls.deserialize(var,
use_pydantic_models=use_pydantic_models)
diff --git a/airflow/settings.py b/airflow/settings.py
index 751bb38760..175a63f69d 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -313,6 +313,8 @@ AIRFLOW_TESTS_PATH = os.path.join(AIRFLOW_PATH, "tests")
AIRFLOW_SETTINGS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "settings.py")
AIRFLOW_UTILS_SESSION_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils",
"session.py")
AIRFLOW_MODELS_BASEOPERATOR_PATH = os.path.join(AIRFLOW_PATH, "airflow",
"models", "baseoperator.py")
+AIRFLOW_MODELS_DAG_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models",
"dag.py")
+AIRFLOW_DB_UTILS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "db.py")
class TracebackSessionForTests:
@@ -370,6 +372,9 @@ class TracebackSessionForTests:
:return: True if the object was created from test code, False
otherwise.
"""
self.traceback = traceback.extract_stack()
+ if any(filename.endswith("_pytest/fixtures.py") for filename, _, _, _
in self.traceback):
+ # This is a fixture call
+ return True, None
airflow_frames = [
tb
for tb in self.traceback
@@ -378,24 +383,30 @@ class TracebackSessionForTests:
and not tb.filename == AIRFLOW_UTILS_SESSION_PATH
]
if any(
- filename.endswith("conftest.py") or
filename.endswith("tests/test_utils/db.py")
- for filename, _, _, _ in airflow_frames
+ filename.endswith("conftest.py")
+ or filename.endswith("tests/test_utils/db.py")
+ or (filename.startswith(AIRFLOW_TESTS_PATH) and name in
("setup_method", "teardown_method"))
+ for filename, _, name, _ in airflow_frames
):
# This is a fixture call or testing utilities
return True, None
- if (
- len(airflow_frames) >= 2
- and airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH)
- and airflow_frames[-1].filename == AIRFLOW_MODELS_BASEOPERATOR_PATH
- and airflow_frames[-1].name == "run"
- ):
- # This is baseoperator run method that is called directly from the
test code and this is
- # usual pattern where we create a session in the test code to
create dag_runs for tests.
- # If `run` code will be run inside a real "airflow" code the stack
trace would be longer
- # and it would not be directly called from the test code. Also if
subsequently any of the
- # run_task() method called later from the task code will attempt
to execute any DB
- # method, the stack trace will be longer and we will catch it as
"illegal" call.
- return True, None
+ if len(airflow_frames) >= 2 and
airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH):
+ # Let's look at what we are calling directly from the test code
+ current_filename, current_method_name =
airflow_frames[-1].filename, airflow_frames[-1].name
+ if (current_filename, current_method_name) in (
+ (AIRFLOW_MODELS_BASEOPERATOR_PATH, "run"),
+ (AIRFLOW_MODELS_DAG_PATH, "create_dagrun"),
+ ):
+ # This is baseoperator run method that is called directly from
the test code and this is
+ # usual pattern where we create a session in the test code to
create dag_runs for tests.
+ # If `run` code will be run inside a real "airflow" code the
stack trace would be longer
+ # and it would not be directly called from the test code. Also
if subsequently any of the
+ # run_task() method called later from the task code will
attempt to execute any DB
+ # method, the stack trace will be longer and we will catch it
as "illegal" call.
+ return True, None
+ if current_filename == AIRFLOW_DB_UTILS_PATH:
+ # This is a util method called directly from the test code
+ return True, None
for tb in airflow_frames[::-1]:
if tb.filename.startswith(AIRFLOW_PATH):
if tb.filename.startswith(AIRFLOW_TESTS_PATH):
@@ -407,6 +418,16 @@ class TracebackSessionForTests:
# The traceback line will be always 3rd (two bottom ones are Airflow)
return False, self.traceback[-2]
+ def get_bind(
+ self,
+ mapper=None,
+ clause=None,
+ bind=None,
+ _sa_skip_events=None,
+ _sa_skip_for_implicit_returning=False,
+ ):
+ pass
+
def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool:
"""Determine whether the database connection URI specifies a relative
path."""
diff --git a/airflow/traces/utils.py b/airflow/traces/utils.py
index afab2591d5..9932c249f0 100644
--- a/airflow/traces/utils.py
+++ b/airflow/traces/utils.py
@@ -22,7 +22,6 @@ from typing import TYPE_CHECKING
from airflow.traces import NO_TRACE_ID
from airflow.utils.hashlib_wrapper import md5
-from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
from airflow.models import DagRun, TaskInstance
@@ -75,12 +74,8 @@ def gen_dag_span_id(dag_run: DagRun, as_int: bool = False)
-> str | int:
def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int:
"""Generate span id from the task instance."""
dag_run = ti.dag_run
- if ti.state == TaskInstanceState.SUCCESS or ti.state ==
TaskInstanceState.FAILED:
- try_number = ti.try_number - 1
- else:
- try_number = ti.try_number
return _gen_id(
- [dag_run.dag_id, dag_run.run_id, ti.task_id, str(try_number)],
+ [dag_run.dag_id, dag_run.run_id, ti.task_id, str(ti.try_number)],
as_int,
SPAN_ID,
)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 236beed451..a485f84ed4 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2163,7 +2163,7 @@ class Airflow(AirflowBaseView):
.limit(num_recent_confs)
)
recent_confs = {
- run_id: json.dumps(run_conf)
+ run_id: json.dumps(run_conf, cls=utils_json.WebEncoder)
for run_id, run_conf in ((run.run_id, run.conf) for run in
recent_runs)
if isinstance(run_conf, dict) and any(run_conf)
}
@@ -2198,6 +2198,7 @@ class Airflow(AirflowBaseView):
},
indent=4,
ensure_ascii=False,
+ cls=utils_json.WebEncoder,
)
except TypeError:
flash("Could not pre-populate conf field due to
non-JSON-serializable data-types")
diff --git a/chart/values.schema.json b/chart/values.schema.json
index 5f6a3b55d8..4ba434c341 100644
--- a/chart/values.schema.json
+++ b/chart/values.schema.json
@@ -671,7 +671,7 @@
"tag": {
"description": "The StatsD image tag.",
"type": "string",
- "default": "v0.26.1"
+ "default": "v0.27.1"
},
"pullPolicy": {
"description": "The StatsD image pull policy.",
diff --git a/chart/values.yaml b/chart/values.yaml
index 76130e55bb..a28c3c6d54 100644
--- a/chart/values.yaml
+++ b/chart/values.yaml
@@ -105,7 +105,7 @@ images:
pullPolicy: IfNotPresent
statsd:
repository: quay.io/prometheus/statsd-exporter
- tag: v0.26.1
+ tag: v0.27.1
pullPolicy: IfNotPresent
redis:
repository: redis
diff --git a/clients/python/CHANGELOG.md b/clients/python/CHANGELOG.md
index d56d692d3c..6a254029a4 100644
--- a/clients/python/CHANGELOG.md
+++ b/clients/python/CHANGELOG.md
@@ -17,6 +17,26 @@
under the License.
-->
+# v2.10.0
+
+## Major changes:
+
+ - Add dag_stats rest api endpoint
([#41017](https://github.com/apache/airflow/pull/41017))
+ - AIP-64: Add task instance history list endpoint
([#40988](https://github.com/apache/airflow/pull/40988))
+ - Change DAG Audit log tab to Event Log
([#40967](https://github.com/apache/airflow/pull/40967))
+ - AIP-64: Add REST API endpoints for TI try level details
([#40441](https://github.com/apache/airflow/pull/40441))
+ - Make XCom display as react json
([#40640](https://github.com/apache/airflow/pull/40640))
+ - Replace usages of task context logger with the log table
([#40867](https://github.com/apache/airflow/pull/40867))
+ - Fix tasks API endpoint when DAG doesn't have `start_date`
([#40878](https://github.com/apache/airflow/pull/40878))
+ - Add try_number to log table
([#40739](https://github.com/apache/airflow/pull/40739))
+ - Add executor field to the task instance API
([#40034](https://github.com/apache/airflow/pull/40034))
+ - Add task documentation to details tab in grid view.
([#39899](https://github.com/apache/airflow/pull/39899))
+ - Add max_consecutive_failed_dag_runs in API spec
([#39830](https://github.com/apache/airflow/pull/39830))
+ - Add task failed dependencies to details page.
([#38449](https://github.com/apache/airflow/pull/38449))
+ - Add dag re-parsing request endpoint
([#39138](https://github.com/apache/airflow/pull/39138))
+ - Reorder OpenAPI Spec tags alphabetically
([#38717](https://github.com/apache/airflow/pull/38717))
+
+
# v2.9.1
## Major changes:
diff --git a/clients/python/version.txt b/clients/python/version.txt
index dedcc7d433..10c2c0c3d6 100644
--- a/clients/python/version.txt
+++ b/clients/python/version.txt
@@ -1 +1 @@
-2.9.1
+2.10.0
diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py
b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
index 51ca4bea5d..cef51d9752 100644
--- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py
+++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
@@ -206,6 +206,7 @@ def _run_test(
helm_test_package=None,
keep_env_variables=shell_params.keep_env_variables,
no_db_cleanup=shell_params.no_db_cleanup,
+ database_isolation=shell_params.database_isolation,
)
)
run_cmd.extend(list(extra_pytest_args))
@@ -968,6 +969,7 @@ def helm_tests(
helm_test_package=helm_test_package,
keep_env_variables=False,
no_db_cleanup=False,
+ database_isolation=False,
)
cmd = ["docker", "compose", "run", "--service-ports", "--rm", "airflow",
*pytest_args, *extra_pytest_args]
result = run_command(cmd, check=False, env=env,
output_outside_the_group=True)
diff --git
a/dev/breeze/src/airflow_breeze/templates/CHANGELOG_TEMPLATE.rst.jinja2
b/dev/breeze/src/airflow_breeze/templates/CHANGELOG_TEMPLATE.rst.jinja2
index b8a966448c..e51939c575 100644
--- a/dev/breeze/src/airflow_breeze/templates/CHANGELOG_TEMPLATE.rst.jinja2
+++ b/dev/breeze/src/airflow_breeze/templates/CHANGELOG_TEMPLATE.rst.jinja2
@@ -40,7 +40,7 @@ Features
{%- endif %}
-{%- if classified_changes.fixes %}
+{%- if classified_changes and classified_changes.fixes %}
Bug Fixes
~~~~~~~~~
@@ -50,7 +50,7 @@ Bug Fixes
{%- endif %}
-{%- if classified_changes.misc %}
+{%- if classified_changes and classified_changes.misc %}
Misc
~~~~
@@ -62,7 +62,7 @@ Misc
.. Below changes are excluded from the changelog. Move them to
appropriate section above if needed. Do not delete the lines(!):
-{%- if classified_changes.other %}
+{%- if classified_changes and classified_changes.other %}
{%- for other in classified_changes.other %}
* ``{{ other.message_without_backticks | safe }}``
{%- endfor %}
diff --git a/dev/breeze/src/airflow_breeze/utils/run_tests.py
b/dev/breeze/src/airflow_breeze/utils/run_tests.py
index fe099efbc0..73cbb43081 100644
--- a/dev/breeze/src/airflow_breeze/utils/run_tests.py
+++ b/dev/breeze/src/airflow_breeze/utils/run_tests.py
@@ -311,6 +311,7 @@ def generate_args_for_pytest(
helm_test_package: str | None,
keep_env_variables: bool,
no_db_cleanup: bool,
+ database_isolation: bool,
):
result_log_file, warnings_file, coverage_file = test_paths(test_type,
backend, helm_test_package)
if skip_db_tests:
@@ -327,12 +328,13 @@ def generate_args_for_pytest(
helm_test_package=helm_test_package,
python_version=python_version,
)
+ max_fail = 50
args.extend(
[
"--verbosity=0",
"--strict-markers",
"--durations=100",
- "--maxfail=50",
+ f"--maxfail={max_fail}",
"--color=yes",
f"--junitxml={result_log_file}",
# timeouts in seconds for individual tests
@@ -374,7 +376,7 @@ def generate_args_for_pytest(
args.extend(get_excluded_provider_args(python_version))
if use_xdist:
args.extend(["-n", str(parallelism) if parallelism else "auto"])
- # We have to disabke coverage for Python 3.12 because of the issue with
coverage that takes too long, despite
+ # We have to disable coverage for Python 3.12 because of the issue with
coverage that takes too long, despite
# Using experimental support for Python 3.12 PEP 669. The coverage.py is
not yet fully compatible with the
# full scope of PEP-669. That will be fully done when
https://github.com/nedbat/coveragepy/issues/1746 is
# resolve for now we are disabling coverage for Python 3.12, and it causes
slower execution and occasional
@@ -417,5 +419,13 @@ def convert_parallel_types_to_folders(
python_version=python_version,
)
)
- # leave only folders, strip --pytest-args
- return [arg for arg in args if arg.startswith("test")]
+ # leave only folders, strip --pytest-args that exclude some folders with
`-' prefix
+ folders = [arg for arg in args if arg.startswith("test")]
+ # remove specific provider sub-folders if "tests/providers" is already in
the list
+ # This workarounds pytest issues where it will only run tests from
specific subfolders
+ # if both parent and child folders are in the list
+ # The issue in Pytest (changed behaviour in Pytest 8.2 is tracked here
+ # https://github.com/pytest-dev/pytest/issues/12605
+ if "tests/providers" in folders:
+ folders = [folder for folder in folders if not
folder.startswith("tests/providers/")]
+ return folders
diff --git a/dev/breeze/src/airflow_breeze/utils/selective_checks.py
b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
index 62f5a20abc..224e76c251 100644
--- a/dev/breeze/src/airflow_breeze/utils/selective_checks.py
+++ b/dev/breeze/src/airflow_breeze/utils/selective_checks.py
@@ -861,6 +861,10 @@ class SelectiveChecks:
if "Providers" in current_test_types:
current_test_types.remove("Providers")
current_test_types.update({f"Providers[{provider}]" for provider
in get_available_packages()})
+ if self.skip_provider_tests:
+ current_test_types = {
+ test_type for test_type in current_test_types if not
test_type.startswith("Providers")
+ }
return " ".join(sorted(current_test_types))
@cached_property
diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py
b/dev/breeze/tests/test_pytest_args_for_test_types.py
index a64dccbd06..fbb3785949 100644
--- a/dev/breeze/tests/test_pytest_args_for_test_types.py
+++ b/dev/breeze/tests/test_pytest_args_for_test_types.py
@@ -329,6 +329,19 @@ def
test_pytest_args_for_helm_test_types(helm_test_package: str, pytest_args: li
],
True,
),
+ (
+ "Core Providers[-amazon,google] Providers[amazon]
Providers[google]",
+ [
+ "tests/core",
+ "tests/executors",
+ "tests/jobs",
+ "tests/models",
+ "tests/ti_deps",
+ "tests/utils",
+ "tests/providers",
+ ],
+ False,
+ ),
],
)
def test_folders_for_parallel_test_types(
diff --git a/dev/breeze/tests/test_selective_checks.py
b/dev/breeze/tests/test_selective_checks.py
index 7c0ca94094..6bee6bbc7e 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -1075,7 +1075,7 @@ def test_full_test_needed_when_scripts_changes(files:
tuple[str, ...], expected_
),
(
pytest.param(
- ("INTHEWILD.md",),
+ ("INTHEWILD.md", "tests/providers/asana.py"),
("full tests needed",),
"v2-7-stable",
{
@@ -1097,6 +1097,9 @@ def test_full_test_needed_when_scripts_changes(files:
tuple[str, ...], expected_
"parallel-test-types-list-as-string": "API Always
BranchExternalPython "
"BranchPythonVenv CLI Core ExternalPython Operators Other
PlainAsserts "
"PythonVenv Serialization WWW",
+ "separate-test-types-list-as-string": "API Always
BranchExternalPython "
+ "BranchPythonVenv CLI Core ExternalPython Operators Other
PlainAsserts "
+ "PythonVenv Serialization WWW",
"needs-mypy": "true",
"mypy-folders": "['airflow', 'docs', 'dev']",
},
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index 58fb34b2ad..1ac11366c6 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -41,7 +41,7 @@
"jsonpath_ng>=1.5.3",
"redshift_connector>=2.0.918",
"sqlalchemy_redshift>=0.8.6",
- "watchtower>=3.0.0,<4"
+ "watchtower>=3.0.0,!=3.3.0,<4"
],
"devel-deps": [
"aiobotocore>=2.13.0",
diff --git a/pyproject.toml b/pyproject.toml
index 621a9e48b8..194c4b268c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -405,6 +405,9 @@ combine-as-imports = true
"tests/providers/google/cloud/operators/vertex_ai/test_generative_model.py" =
["E402"]
"tests/providers/google/cloud/triggers/test_vertex_ai.py" = ["E402"]
"tests/providers/openai/hooks/test_openai.py" = ["E402"]
+"tests/providers/opensearch/conftest.py" = ["E402"]
+"tests/providers/opensearch/hooks/test_opensearch.py" = ["E402"]
+"tests/providers/opensearch/operators/test_opensearch.py" = ["E402"]
"tests/providers/openai/operators/test_openai.py" = ["E402"]
"tests/providers/qdrant/hooks/test_qdrant.py" = ["E402"]
"tests/providers/qdrant/operators/test_qdrant.py" = ["E402"]
diff --git a/scripts/docker/entrypoint_ci.sh b/scripts/docker/entrypoint_ci.sh
index 1ff3ef0cd2..6e9b36507b 100755
--- a/scripts/docker/entrypoint_ci.sh
+++ b/scripts/docker/entrypoint_ci.sh
@@ -242,16 +242,17 @@ function check_boto_upgrade() {
echo "${COLOR_BLUE}Upgrading boto3, botocore to latest version to run
Amazon tests with them${COLOR_RESET}"
echo
# shellcheck disable=SC2086
- ${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs
yandexcloud || true
+ ${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs
yandexcloud opensearch-py || true
# We need to include few dependencies to pass pip check with other
dependencies:
# * oss2 as dependency as otherwise jmespath will be bumped (sync with
alibaba provider)
- # * gcloud-aio-auth limit is needed to be included as it bumps
cryptography (sync with google provider)
+ # * cryptography is kept for snowflake-connector-python limitation (sync
with snowflake provider)
# * requests needs to be limited to be compatible with apache beam (sync
with apache-beam provider)
# * yandexcloud requirements for requests does not match those of
apache.beam and latest botocore
# Both requests and yandexcloud exclusion above might be removed after
# https://github.com/apache/beam/issues/32080 is addressed
- # When you remove yandexcloud from the above list, also remove it from
"test_example_dags.py"
- # in "tests/always".
+ # This is already addressed and planned for 2.59.0 release.
+ # When you remove yandexcloud and opensearch from the above list, you
can also remove the
+ # optional providers_dependencies exclusions from "test_example_dags.py"
in "tests/always".
set -x
# shellcheck disable=SC2086
${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade boto3
botocore \
@@ -289,8 +290,9 @@ function check_pydantic() {
echo
echo "${COLOR_YELLOW}Downgrading Pydantic to < 2${COLOR_RESET}"
echo
+ # Pydantic 1.10.17/1.10.15 conflicts with aws-sam-translator so we
need to exclude it
# shellcheck disable=SC2086
- ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade
"pydantic<2.0.0"
+ ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade
"pydantic<2.0.0,!=1.10.17,!=1.10.15"
pip check
else
echo
diff --git a/tests/always/test_example_dags.py
b/tests/always/test_example_dags.py
index 2b5f37631a..b8e3edb99b 100644
--- a/tests/always/test_example_dags.py
+++ b/tests/always/test_example_dags.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import os
+import re
import sys
from glob import glob
from importlib import metadata as importlib_metadata
@@ -39,8 +40,11 @@ OPTIONAL_PROVIDERS_DEPENDENCIES: dict[str, dict[str, str |
None]] = {
# Some examples or system tests may depend on additional packages
# that are not included in certain CI checks.
# The format of the dictionary is as follows:
- # key: the prefix of the file to be excluded,
+ # key: the regexp matching the file to be excluded,
# value: a dictionary containing package distributions with an optional
version specifier, e.g., >=2.3.4
+ ".*example_bedrock_retrieve_and_generate.py": {"opensearch-py": None},
+ ".*example_opensearch.py": {"opensearch-py": None},
+ r".*example_yandexcloud.*\.py": {"yandexcloud": None},
}
IGNORE_AIRFLOW_PROVIDER_DEPRECATION_WARNING: tuple[str, ...] = (
# Certain examples or system tests may trigger
AirflowProviderDeprecationWarnings.
@@ -124,13 +128,6 @@ def example_not_excluded_dags(xfail_db_exception: bool =
False):
for prefix in PROVIDERS_PREFIXES
for provider in suspended_providers_folders
]
- temporary_excluded_upgrade_boto_providers_folders = [
- AIRFLOW_SOURCES_ROOT.joinpath(prefix, provider).as_posix()
- for prefix in PROVIDERS_PREFIXES
- # TODO - remove me when https://github.com/apache/beam/issues/32080 is
addressed
- # and we bring back yandex to be run in case of upgrade boto
- for provider in ["yandex"]
- ]
current_python_excluded_providers_folders = [
AIRFLOW_SOURCES_ROOT.joinpath(prefix, provider).as_posix()
for prefix in PROVIDERS_PREFIXES
@@ -146,18 +143,13 @@ def example_not_excluded_dags(xfail_db_exception: bool =
False):
if candidate.startswith(tuple(suspended_providers_folders)):
param_marks.append(pytest.mark.skip(reason="Suspended
provider"))
- if os.environ.get("UPGRADE_BOTO", "false") == "true" and
candidate.startswith(
- tuple(temporary_excluded_upgrade_boto_providers_folders)
- ):
- param_marks.append(pytest.mark.skip(reason="Temporary excluded
upgrade boto provider"))
-
if
candidate.startswith(tuple(current_python_excluded_providers_folders)):
param_marks.append(
pytest.mark.skip(reason=f"Not supported for Python
{CURRENT_PYTHON_VERSION}")
)
for optional, dependencies in
OPTIONAL_PROVIDERS_DEPENDENCIES.items():
- if candidate.endswith(optional):
+ if re.match(optional, candidate):
for distribution_name, specifier in dependencies.items():
result, reason =
match_optional_dependencies(distribution_name, specifier)
if not result:
diff --git a/tests/cli/commands/test_internal_api_command.py
b/tests/cli/commands/test_internal_api_command.py
index 99992e6266..a1aaf2daca 100644
--- a/tests/cli/commands/test_internal_api_command.py
+++ b/tests/cli/commands/test_internal_api_command.py
@@ -83,6 +83,9 @@ class TestCLIGetNumReadyWorkersRunning:
assert self.monitor._get_num_ready_workers_running() == 0
+# Those tests are skipped in isolation mode because they interfere with the
internal API
+# server already running in the background in the isolation mode.
[email protected]_if_database_isolation_mode
@pytest.mark.db_test
@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled")
class TestCliInternalAPI(_ComonCLIGunicornTestClass):
diff --git a/tests/cli/commands/test_webserver_command.py
b/tests/cli/commands/test_webserver_command.py
index 07d95a9e5f..fa2e58af9e 100644
--- a/tests/cli/commands/test_webserver_command.py
+++ b/tests/cli/commands/test_webserver_command.py
@@ -226,6 +226,9 @@ class TestCLIGetNumReadyWorkersRunning:
assert self.monitor._get_num_ready_workers_running() == 0
+# Those tests are skipped in isolation mode because they interfere with the
internal API
+# server already running in the background in the isolation mode.
[email protected]_if_database_isolation_mode
@pytest.mark.db_test
class TestCliWebServer(_ComonCLIGunicornTestClass):
main_process_regexp = r"airflow webserver"
diff --git a/tests/decorators/test_bash.py b/tests/decorators/test_bash.py
index ba8948936e..9fa7999e83 100644
--- a/tests/decorators/test_bash.py
+++ b/tests/decorators/test_bash.py
@@ -33,6 +33,9 @@ from tests.test_utils.db import clear_db_dags, clear_db_runs,
clear_rendered_ti_
DEFAULT_DATE = timezone.datetime(2023, 1, 1)
+# TODO(potiuk) see why this test hangs in DB isolation mode
+pytestmark = pytest.mark.skip_if_database_isolation_mode
+
@pytest.mark.db_test
class TestBashDecorator:
diff --git a/tests/decorators/test_branch_external_python.py
b/tests/decorators/test_branch_external_python.py
index d991f22cd5..d2466365be 100644
--- a/tests/decorators/test_branch_external_python.py
+++ b/tests/decorators/test_branch_external_python.py
@@ -24,7 +24,8 @@ import pytest
from airflow.decorators import task
from airflow.utils.state import State
-pytestmark = pytest.mark.db_test
+# TODO: (potiuk) - AIP-44 - check why this test hangs
+pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
class Test_BranchExternalPythonDecoratedOperator:
diff --git a/tests/decorators/test_branch_python.py
b/tests/decorators/test_branch_python.py
index 58bb216246..3cd95b8d2a 100644
--- a/tests/decorators/test_branch_python.py
+++ b/tests/decorators/test_branch_python.py
@@ -22,7 +22,8 @@ import pytest
from airflow.decorators import task
from airflow.utils.state import State
-pytestmark = pytest.mark.db_test
+# TODO: (potiuk) - AIP-44 - check why this test hangs
+pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
class Test_BranchPythonDecoratedOperator:
diff --git a/tests/decorators/test_branch_virtualenv.py
b/tests/decorators/test_branch_virtualenv.py
index a5c23de392..6cdfa1ddff 100644
--- a/tests/decorators/test_branch_virtualenv.py
+++ b/tests/decorators/test_branch_virtualenv.py
@@ -22,7 +22,8 @@ import pytest
from airflow.decorators import task
from airflow.utils.state import State
-pytestmark = pytest.mark.db_test
+# TODO: (potiuk) - AIP-44 - check why this test hangs
+pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
class TestBranchPythonVirtualenvDecoratedOperator:
diff --git a/tests/decorators/test_condition.py
b/tests/decorators/test_condition.py
index 315db6bfe0..28e0f0bf8f 100644
--- a/tests/decorators/test_condition.py
+++ b/tests/decorators/test_condition.py
@@ -28,7 +28,8 @@ if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context
-pytestmark = pytest.mark.db_test
+# TODO(potiuk) see why this test hangs in DB isolation mode
+pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation
mode
diff --git a/tests/decorators/test_external_python.py
b/tests/decorators/test_external_python.py
index 5ed5874e3a..0d9a439aa2 100644
--- a/tests/decorators/test_external_python.py
+++ b/tests/decorators/test_external_python.py
@@ -29,7 +29,7 @@ import pytest
from airflow.decorators import setup, task, teardown
from airflow.utils import timezone
-pytestmark = pytest.mark.db_test
+pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 9d2b9a14c8..067beff3ab 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -41,7 +41,7 @@ from airflow.utils.types import DagRunType
from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.operators.test_python import BasePythonTest
-pytestmark = pytest.mark.db_test
+pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]
if TYPE_CHECKING:
@@ -281,6 +281,8 @@ class TestAirflowTaskDecorator(BasePythonTest):
def add_number(self, num: int) -> int:
return self.num + num
+ # TODO(potiuk) see why this test hangs in DB isolation mode
+ @pytest.mark.skip_if_database_isolation_mode
def test_fail_multiple_outputs_key_type(self):
@task_decorator(multiple_outputs=True)
def add_number(num: int):
@@ -293,6 +295,8 @@ class TestAirflowTaskDecorator(BasePythonTest):
with pytest.raises(AirflowException):
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ # TODO(potiuk) see why this test hangs in DB isolation mode
+ @pytest.mark.skip_if_database_isolation_mode
def test_fail_multiple_outputs_no_dict(self):
@task_decorator(multiple_outputs=True)
def add_number(num: int):
@@ -541,6 +545,8 @@ class TestAirflowTaskDecorator(BasePythonTest):
assert "add_2" in self.dag_non_serialized.task_ids
+ # TODO(potiuk) see why this test hangs in DB isolation mode
+ @pytest.mark.skip_if_database_isolation_mode
def test_dag_task_multiple_outputs(self):
"""Tests dag.task property to generate task with multiple outputs"""
@@ -863,6 +869,7 @@ def test_task_decorator_has_wrapped_attr():
assert decorated_test_func.__wrapped__ is org_test_func, "__wrapped__ attr
is not the original function"
[email protected]_serialized_dag(False)
@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation
mode
def test_upstream_exception_produces_none_xcom(dag_maker, session):
from airflow.exceptions import AirflowSkipException
@@ -900,6 +907,7 @@ def test_upstream_exception_produces_none_xcom(dag_maker,
session):
assert result == "'example' None"
[email protected]_serialized_dag(False)
@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation
mode
@pytest.mark.parametrize("multiple_outputs", [True, False])
def test_multiple_outputs_produces_none_xcom_when_task_is_skipped(dag_maker,
session, multiple_outputs):
@@ -958,6 +966,7 @@ def test_no_warnings(reset_logging_config, caplog):
assert caplog.messages == []
[email protected]_serialized_dag(False)
@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation
mode
def test_task_decorator_dataset(dag_maker, session):
from airflow.datasets import Dataset
diff --git a/tests/decorators/test_python_virtualenv.py
b/tests/decorators/test_python_virtualenv.py
index 554b33ceb9..57a096ef19 100644
--- a/tests/decorators/test_python_virtualenv.py
+++ b/tests/decorators/test_python_virtualenv.py
@@ -30,7 +30,7 @@ from airflow.exceptions import RemovedInAirflow3Warning
from airflow.utils import timezone
from airflow.utils.state import TaskInstanceState
-pytestmark = pytest.mark.db_test
+pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}"
@@ -373,6 +373,8 @@ class TestPythonVirtualenvDecorator:
assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ # TODO(potiuk) see why this test hangs in DB isolation mode
+ @pytest.mark.skip_if_database_isolation_mode
def test_invalid_annotation(self, dag_maker):
import uuid
diff --git a/tests/decorators/test_sensor.py b/tests/decorators/test_sensor.py
index e970894a38..a283ed871b 100644
--- a/tests/decorators/test_sensor.py
+++ b/tests/decorators/test_sensor.py
@@ -26,7 +26,7 @@ from airflow.models import XCom
from airflow.sensors.base import PokeReturnValue
from airflow.utils.state import State
-pytestmark = pytest.mark.db_test
+pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]
class TestSensorDecorator:
@@ -52,6 +52,7 @@ class TestSensorDecorator:
sf >> df
dr = dag_maker.create_dagrun()
+
sf.operator.run(start_date=dr.execution_date,
end_date=dr.execution_date, ignore_ti_state=True)
tis = dr.get_task_instances()
assert len(tis) == 2
diff --git a/tests/decorators/test_short_circuit.py
b/tests/decorators/test_short_circuit.py
index 1d43de6842..1c8349b6c9 100644
--- a/tests/decorators/test_short_circuit.py
+++ b/tests/decorators/test_short_circuit.py
@@ -24,7 +24,7 @@ from airflow.decorators import task
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
-pytestmark = pytest.mark.db_test
+pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
DEFAULT_DATE = datetime(2022, 8, 17)
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index c2993e9ce8..4516686a67 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1443,7 +1443,10 @@ class TestTaskInstance:
# Parameterized tests to check for the correct firing
# of the trigger_rule under various circumstances of mapped task
# Numeric fields are in order:
- # successes, skipped, failed, upstream_failed, done,removed
+ # successes, skipped, failed, upstream_failed, done,remove
+ # Does not work for database isolation mode because there is local test
monkeypatching of upstream_failed
+ # That never gets propagated to internal_api
+ @pytest.mark.skip_if_database_isolation_mode
@pytest.mark.parametrize(
"trigger_rule, upstream_states, flag_upstream_failed, expect_state,
expect_completed",
[
@@ -1539,8 +1542,10 @@ class TestTaskInstance:
monkeypatch.setattr(_UpstreamTIStates, "calculate", lambda *_:
upstream_states)
ti = dr.get_task_instance("do_something_else", session=session)
ti.map_index = 0
+ base_task = ti.task
+
for map_index in range(1, 5):
- ti = TaskInstance(dr.task_instances[-1].task, run_id=dr.run_id,
map_index=map_index)
+ ti = TaskInstance(base_task, run_id=dr.run_id, map_index=map_index)
session.add(ti)
ti.dag_run = dr
session.flush()
diff --git a/tests/models/test_variable.py b/tests/models/test_variable.py
index 3ec2691e5a..6fb6fa15f2 100644
--- a/tests/models/test_variable.py
+++ b/tests/models/test_variable.py
@@ -30,7 +30,7 @@ from airflow.secrets.metastore import MetastoreBackend
from tests.test_utils import db
from tests.test_utils.config import conf_vars
-pytestmark = pytest.mark.db_test
+pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
class TestVariable:
@@ -47,6 +47,7 @@ class TestVariable:
db.clear_db_variables()
crypto._fernet = None
+ @pytest.mark.skip_if_database_isolation_mode # Does not work in db
isolation mode, internal API has other fernet
@conf_vars({("core", "fernet_key"): "", ("core", "unit_test_mode"):
"True"})
def test_variable_no_encryption(self, session):
"""
@@ -60,6 +61,7 @@ class TestVariable:
# should mask anything. That logic is tested in test_secrets_masker.py
self.mask_secret.assert_called_once_with("value", "key")
+ @pytest.mark.skip_if_database_isolation_mode # Does not work in db
isolation mode, internal API has other fernet
@conf_vars({("core", "fernet_key"): Fernet.generate_key().decode()})
def test_variable_with_encryption(self, session):
"""
@@ -70,6 +72,7 @@ class TestVariable:
assert test_var.is_encrypted
assert test_var.val == "value"
+ @pytest.mark.skip_if_database_isolation_mode # Does not work in db
isolation mode, internal API has other fernet
@pytest.mark.parametrize("test_value", ["value", ""])
def test_var_with_encryption_rotate_fernet_key(self, test_value, session):
"""
@@ -152,6 +155,7 @@ class TestVariable:
Variable.update(key="test_key", value="value2", session=session)
assert "value2" == Variable.get("test_key")
+ @pytest.mark.skip_if_database_isolation_mode # Does not work in db
isolation mode, API server has other ENV
def test_variable_update_fails_on_non_metastore_variable(self, session):
with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"):
with pytest.raises(AttributeError):
@@ -281,6 +285,7 @@ class TestVariable:
mock_backend.get_variable.assert_called_once() # second call was not
made because of cache
assert first == second
+ @pytest.mark.skip_if_database_isolation_mode # Does not work in db
isolation mode, internal API has other env
def test_cache_invalidation_on_set(self, session):
with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="from_env"):
a = Variable.get("key") # value is saved in cache
@@ -316,7 +321,7 @@ def test_masking_only_secret_values(variable_value,
deserialize_json, expected_m
val=variable_value,
)
session.add(var)
- session.flush()
+ session.commit()
# Make sure we re-load it, not just get the cached object back
session.expunge(var)
_secrets_masker().patterns = set()
@@ -326,5 +331,4 @@ def test_masking_only_secret_values(variable_value,
deserialize_json, expected_m
for expected_masked_value in expected_masked_values:
assert expected_masked_value in _secrets_masker().patterns
finally:
- session.rollback()
db.clear_db_variables()
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 993d70cad3..f242812751 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -1306,6 +1306,9 @@ class
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
"AssertRewritingHook including captured stdout and we need to run "
"it with `--assert=plain` pytest option and PYTEST_PLAIN_ASSERTS=true
.",
)
+ # TODO(potiuk) check if this can be fixed in the future - for now we are
skipping tests with venv
+ # and airflow context in DB isolation mode as they are passing None as DAG.
+ @pytest.mark.skip_if_database_isolation_mode
def test_airflow_context(self, serializer):
def f(
# basic
diff --git a/tests/providers/opensearch/conftest.py
b/tests/providers/opensearch/conftest.py
index 47a447188e..934bbd642a 100644
--- a/tests/providers/opensearch/conftest.py
+++ b/tests/providers/opensearch/conftest.py
@@ -19,12 +19,19 @@ from __future__ import annotations
from typing import Any
import pytest
-from opensearchpy import OpenSearch
+from airflow.hooks.base import BaseHook
from airflow.models import Connection
-from airflow.providers.opensearch.hooks.opensearch import OpenSearchHook
from airflow.utils import db
+try:
+ from opensearchpy import OpenSearch
+
+ from airflow.providers.opensearch.hooks.opensearch import OpenSearchHook
+except ImportError:
+ OpenSearch = None # type: ignore[assignment, misc]
+ OpenSearchHook = BaseHook # type: ignore[assignment,misc]
+
# TODO: FIXME - those Mocks have overrides that are not used but they also do
not make Mypy Happy
# mypy: disable-error-code="override"
diff --git a/tests/providers/opensearch/hooks/test_opensearch.py
b/tests/providers/opensearch/hooks/test_opensearch.py
index 84360ae73f..43075e8532 100644
--- a/tests/providers/opensearch/hooks/test_opensearch.py
+++ b/tests/providers/opensearch/hooks/test_opensearch.py
@@ -18,8 +18,9 @@ from __future__ import annotations
from unittest import mock
-import opensearchpy
import pytest
+
+opensearchpy = pytest.importorskip("opensearchpy")
from opensearchpy import Urllib3HttpConnection
from airflow.exceptions import AirflowException
diff --git a/tests/providers/opensearch/operators/test_opensearch.py
b/tests/providers/opensearch/operators/test_opensearch.py
index 706112fef6..63ad7eafe4 100644
--- a/tests/providers/opensearch/operators/test_opensearch.py
+++ b/tests/providers/opensearch/operators/test_opensearch.py
@@ -17,6 +17,9 @@
from __future__ import annotations
import pytest
+
+opensearchpy = pytest.importorskip("opensearchpy")
+
from opensearchpy import Document, Keyword, Text
from airflow.models import DAG
diff --git a/tests/sensors/test_external_task_sensor.py
b/tests/sensors/test_external_task_sensor.py
index fbebd3d120..e7a5991963 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -809,6 +809,7 @@ exit 0
dag=self.dag,
)
+ @pytest.mark.skip_if_database_isolation_mode # Test is broken in db
isolation mode
def test_external_task_sensor_waits_for_task_check_existence(self):
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
@@ -821,6 +822,7 @@ exit 0
with pytest.raises(AirflowException):
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
+ @pytest.mark.skip_if_database_isolation_mode # Test is broken in db
isolation mode
def test_external_task_sensor_waits_for_dag_check_existence(self):
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index e9c8ceaf03..d1c6787db3 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -399,6 +399,8 @@ def timetable_plugin(monkeypatch):
)
+# TODO: (potiuk) - AIP-44 - check why this test hangs
[email protected]_if_database_isolation_mode
class TestStringifiedDAGs:
"""Unit tests for stringified DAGs."""
diff --git a/tests/www/views/test_views_trigger_dag.py
b/tests/www/views/test_views_trigger_dag.py
index c53213c3e6..9b2b297198 100644
--- a/tests/www/views/test_views_trigger_dag.py
+++ b/tests/www/views/test_views_trigger_dag.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import datetime
import json
+from decimal import Decimal
from urllib.parse import quote
import pytest
@@ -28,6 +29,7 @@ from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.security import permissions
from airflow.utils import timezone
+from airflow.utils.json import WebEncoder
from airflow.utils.session import create_session
from airflow.utils.types import DagRunType
from tests.test_utils.api_connexion_utils import create_test_client
@@ -92,6 +94,32 @@ def test_trigger_dag_conf(admin_client):
assert run.conf == conf_dict
+def test_trigger_dag_conf_serializable_fields(admin_client):
+ test_dag_id = "example_bash_operator"
+ time_now = timezone.utcnow()
+ conf_dict = {
+ "string": "Hello, World!",
+ "date_str": "2024-08-08T09:57:35.300858",
+ "datetime": time_now,
+ "decimal": Decimal(10.465),
+ }
+ expected_conf = {
+ "string": "Hello, World!",
+ "date_str": "2024-08-08T09:57:35.300858",
+ "datetime": time_now.isoformat(),
+ "decimal": 10.465,
+ }
+
+ admin_client.post(f"dags/{test_dag_id}/trigger", data={"conf":
json.dumps(conf_dict, cls=WebEncoder)})
+
+ with create_session() as session:
+ run = session.query(DagRun).filter(DagRun.dag_id ==
test_dag_id).first()
+ assert run is not None
+ assert DagRunType.MANUAL in run.run_id
+ assert run.run_type == DagRunType.MANUAL
+ assert run.conf == expected_conf
+
+
def test_trigger_dag_conf_malformed(admin_client):
test_dag_id = "example_bash_operator"