This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ee200f8a363 Remove AIRFLOW_2_10_PLUS conditions (#49877)
ee200f8a363 is described below
commit ee200f8a363dad626f142fb109d9fc01adcd7947
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Apr 29 00:17:07 2025 +0200
Remove AIRFLOW_2_10_PLUS conditions (#49877)
After updating providers to 2.10+ we can remove all conditions for
2.10+.
---
.../tests/unit/listeners/class_listener.py | 35 +---
devel-common/src/tests_common/test_utils/compat.py | 34 ++--
devel-common/src/tests_common/test_utils/db.py | 9 +-
.../src/tests_common/test_utils/version_compat.py | 1 -
.../providers/amazon/aws/hooks/redshift_sql.py | 5 +-
.../src/airflow/providers/amazon/version_compat.py | 1 -
.../amazon/aws/example_sagemaker_unified_studio.py | 5 -
.../amazon/aws/executors/ecs/test_ecs_executor.py | 3 +-
.../amazon/tests/unit/amazon/aws/hooks/test_s3.py | 25 +--
.../amazon/aws/log/test_cloudwatch_task_handler.py | 22 +--
.../unit/amazon/aws/operators/test_redshift_sql.py | 201 +++++++++------------
.../providers/apache/spark/version_compat.py | 1 -
.../tests/unit/celery/cli/test_celery_command.py | 9 +-
.../unit/celery/executors/test_celery_executor.py | 6 +-
.../providers/cncf/kubernetes/version_compat.py | 1 -
.../providers/common/compat/assets/__init__.py | 9 +-
.../providers/common/compat/lineage/hook.py | 6 +-
.../providers/common/compat/standard/operators.py | 6 +-
.../providers/common/compat/version_compat.py | 1 -
.../airflow/providers/common/io/version_compat.py | 1 -
.../airflow/providers/databricks/version_compat.py | 1 -
.../providers/dbt/cloud/utils/openlineage.py | 11 +-
.../airflow/providers/dbt/cloud/version_compat.py | 1 -
.../src/airflow/providers/docker/version_compat.py | 1 -
.../src/airflow/providers/edge3/version_compat.py | 1 -
.../providers/elasticsearch/version_compat.py | 1 -
.../src/airflow/providers/google/version_compat.py | 1 -
.../tests/unit/google/cloud/hooks/test_bigquery.py | 3 -
.../tests/unit/google/cloud/hooks/test_gcs.py | 9 -
.../providers/microsoft/azure/version_compat.py | 1 -
.../unit/microsoft/azure/operators/test_msgraph.py | 2 -
.../unit/microsoft/azure/sensors/test_msgraph.py | 2 -
.../providers/openlineage/plugins/listener.py | 36 +---
.../providers/openlineage/plugins/openlineage.py | 6 +-
.../airflow/providers/openlineage/utils/utils.py | 8 +-
.../providers/openlineage/version_compat.py | 1 -
.../unit/openlineage/extractors/test_manager.py | 30 ++-
.../unit/openlineage/plugins/test_execution.py | 6 +-
.../unit/openlineage/plugins/test_listener.py | 39 ++--
.../tests/unit/openlineage/plugins/test_utils.py | 108 +----------
.../tests/unit/openlineage/utils/test_utils.py | 4 +-
.../airflow/providers/opensearch/version_compat.py | 1 -
.../src/airflow/providers/presto/version_compat.py | 1 -
.../src/airflow/providers/redis/version_compat.py | 1 -
.../src/airflow/providers/sftp/version_compat.py | 1 -
.../tests/unit/smtp/notifications/test_smtp.py | 4 +-
.../providers/snowflake/utils/openlineage.py | 14 +-
.../airflow/providers/snowflake/version_compat.py | 1 -
.../tests/unit/snowflake/utils/test_openlineage.py | 8 +-
.../airflow/providers/standard/operators/python.py | 26 +--
.../src/airflow/providers/standard/sensors/time.py | 5 +-
.../providers/standard/triggers/temporal.py | 10 +-
.../airflow/providers/standard/version_compat.py | 1 -
.../standard/decorators/test_external_python.py | 11 --
.../tests/unit/standard/operators/test_python.py | 11 +-
.../tests/unit/standard/triggers/test_temporal.py | 41 -----
.../src/airflow/providers/trino/version_compat.py | 1 -
57 files changed, 205 insertions(+), 585 deletions(-)
diff --git a/airflow-core/tests/unit/listeners/class_listener.py
b/airflow-core/tests/unit/listeners/class_listener.py
index de235abbd40..49a018ea9d8 100644
--- a/airflow-core/tests/unit/listeners/class_listener.py
+++ b/airflow-core/tests/unit/listeners/class_listener.py
@@ -20,7 +20,7 @@ from __future__ import annotations
from airflow.listeners import hookimpl
from airflow.utils.state import DagRunState, TaskInstanceState
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
if AIRFLOW_V_3_0_PLUS:
@@ -64,8 +64,7 @@ if AIRFLOW_V_3_0_PLUS:
@hookimpl
def on_dag_run_failed(self, dag_run, msg: str):
self.state.append(DagRunState.FAILED)
-
-elif AIRFLOW_V_2_10_PLUS:
+else:
class ClassBasedListener: # type: ignore[no-redef]
def __init__(self):
@@ -95,36 +94,6 @@ elif AIRFLOW_V_2_10_PLUS:
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance,
error: None | str | BaseException):
self.state.append(TaskInstanceState.FAILED)
-else:
-
- class ClassBasedListener: # type: ignore[no-redef]
- def __init__(self):
- self.started_component = None
- self.stopped_component = None
- self.state = []
-
- @hookimpl
- def on_starting(self, component):
- self.started_component = component
- self.state.append(DagRunState.RUNNING)
-
- @hookimpl
- def before_stopping(self, component):
- global stopped_component
- stopped_component = component
- self.state.append(DagRunState.SUCCESS)
-
- @hookimpl
- def on_task_instance_running(self, previous_state, task_instance,
session):
- self.state.append(TaskInstanceState.RUNNING)
-
- @hookimpl
- def on_task_instance_success(self, previous_state, task_instance,
session):
- self.state.append(TaskInstanceState.SUCCESS)
-
- @hookimpl
- def on_task_instance_failed(self, previous_state, task_instance,
session):
- self.state.append(TaskInstanceState.FAILED)
def clear():
diff --git a/devel-common/src/tests_common/test_utils/compat.py
b/devel-common/src/tests_common/test_utils/compat.py
index 59e5fbdaf1a..c5eef051bda 100644
--- a/devel-common/src/tests_common/test_utils/compat.py
+++ b/devel-common/src/tests_common/test_utils/compat.py
@@ -23,8 +23,6 @@ from typing import TYPE_CHECKING, Any, cast
from airflow.exceptions import AirflowOptionalProviderFeatureException
from airflow.utils.helpers import prune_dict
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
-
try:
# ImportError has been renamed to ParseImportError in airflow 2.10.0, and
since our provider tests should
# run on all supported versions of Airflow, this compatibility shim falls
back to the old ImportError so
@@ -86,35 +84,27 @@ else:
except ModuleNotFoundError:
# dataset is renamed to asset since Airflow 3.0
from airflow.models.dataset import (
+ DagScheduleDatasetAliasReference as DagScheduleAssetAliasReference,
DagScheduleDatasetReference as DagScheduleAssetReference,
+ DatasetAliasModel as AssetAliasModel,
DatasetDagRunQueue as AssetDagRunQueue,
DatasetEvent as AssetEvent,
DatasetModel as AssetModel,
TaskOutletDatasetReference as TaskOutletAssetReference,
)
- if AIRFLOW_V_2_10_PLUS:
- from airflow.models.dataset import (
- DagScheduleDatasetAliasReference as
DagScheduleAssetAliasReference,
- DatasetAliasModel as AssetAliasModel,
- )
-
def deserialize_operator(serialized_operator: dict[str, Any]) -> Operator:
- if AIRFLOW_V_2_10_PLUS:
- # In airflow 2.10+ we can deserialize operator using regular
deserialize method.
- # We do not need to use deserialize_operator method explicitly but
some tests are deserializing the
- # operator and in the future they could use regular ``deserialize``
method. This method is a shim
- # to make deserialization of operator works for tests run against
older Airflow versions and tests
- # should use that method instead of calling
``BaseSerialization.deserialize`` directly.
- # We can remove this method and switch to the regular ``deserialize``
method as long as all providers
- # are updated to airflow 2.10+.
- from airflow.serialization.serialized_objects import BaseSerialization
-
- return BaseSerialization.deserialize(serialized_operator)
- from airflow.serialization.serialized_objects import SerializedBaseOperator
-
- return SerializedBaseOperator.deserialize_operator(serialized_operator)
+ # In airflow 2.10+ we can deserialize operator using regular deserialize
method.
+ # We do not need to use deserialize_operator method explicitly but some
tests are deserializing the
+ # operator and in the future they could use regular ``deserialize``
method. This method is a shim
+ # to make deserialization of operator works for tests run against older
Airflow versions and tests
+ # should use that method instead of calling
``BaseSerialization.deserialize`` directly.
+ # We can remove this method and switch to the regular ``deserialize``
method as long as all providers
+ # are updated to airflow 2.10+.
+ from airflow.serialization.serialized_objects import BaseSerialization
+
+ return BaseSerialization.deserialize(serialized_operator)
def connection_to_dict(
diff --git a/devel-common/src/tests_common/test_utils/db.py
b/devel-common/src/tests_common/test_utils/db.py
index 4ab88d1c75e..c5711ec514c 100644
--- a/devel-common/src/tests_common/test_utils/db.py
+++ b/devel-common/src/tests_common/test_utils/db.py
@@ -51,7 +51,7 @@ from tests_common.test_utils.compat import (
ParseImportError,
TaskOutletAssetReference,
)
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
if TYPE_CHECKING:
from pathlib import Path
@@ -159,11 +159,10 @@ def clear_db_assets():
session.query(AssetDagRunQueue).delete()
session.query(DagScheduleAssetReference).delete()
session.query(TaskOutletAssetReference).delete()
- if AIRFLOW_V_2_10_PLUS:
- from tests_common.test_utils.compat import AssetAliasModel,
DagScheduleAssetAliasReference
+ from tests_common.test_utils.compat import AssetAliasModel,
DagScheduleAssetAliasReference
- session.query(AssetAliasModel).delete()
- session.query(DagScheduleAssetAliasReference).delete()
+ session.query(AssetAliasModel).delete()
+ session.query(DagScheduleAssetAliasReference).delete()
if AIRFLOW_V_3_0_PLUS:
from airflow.models.asset import (
AssetActive,
diff --git a/devel-common/src/tests_common/test_utils/version_compat.py
b/devel-common/src/tests_common/test_utils/version_compat.py
index 7227de2d859..2e990761628 100644
--- a/devel-common/src/tests_common/test_utils/version_compat.py
+++ b/devel-common/src/tests_common/test_utils/version_compat.py
@@ -32,6 +32,5 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
[].sort()
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py
index c74b23352f9..bdd90f1d6ba 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py
@@ -26,7 +26,6 @@ from sqlalchemy.engine.url import URL
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-from airflow.providers.amazon.version_compat import AIRFLOW_V_2_10_PLUS
from airflow.providers.common.sql.hooks.sql import DbApiHook
if TYPE_CHECKING:
@@ -260,6 +259,4 @@ class RedshiftSQLHook(DbApiHook):
def get_openlineage_default_schema(self) -> str | None:
"""Return current schema. This is usually changed with ``SEARCH_PATH``
parameter."""
- if AIRFLOW_V_2_10_PLUS:
- return self.get_first("SELECT CURRENT_SCHEMA();")[0]
- return super().get_openlineage_default_schema()
+ return self.get_first("SELECT CURRENT_SCHEMA();")[0]
diff --git a/providers/amazon/src/airflow/providers/amazon/version_compat.py
b/providers/amazon/src/airflow/providers/amazon/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/amazon/src/airflow/providers/amazon/version_compat.py
+++ b/providers/amazon/src/airflow/providers/amazon/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git
a/providers/amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py
b/providers/amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py
index ab5f91ee2fb..ea002228220 100644
---
a/providers/amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py
+++
b/providers/amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py
@@ -18,8 +18,6 @@ from __future__ import annotations
from datetime import datetime
-import pytest
-
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
@@ -28,7 +26,6 @@ from
airflow.providers.amazon.aws.operators.sagemaker_unified_studio import (
)
from system.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
"""
Prerequisites: The account which runs this test must manually have the
following:
@@ -42,8 +39,6 @@ The setup tasks will set up the project and configure the
test runner to emulate
Then, the SageMakerNotebookOperator will run a test notebook. This should spin
up a SageMaker training job, run the notebook, and exit successfully.
"""
-pytestmark = pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Test requires
Airflow 2.10+")
-
DAG_ID = "example_sagemaker_unified_studio"
# Externally fetched variables:
diff --git
a/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py
b/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py
index 074d2babd85..70cedadab34 100644
--- a/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py
+++ b/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -59,7 +59,7 @@ from airflow.version import version as airflow_version_str
from tests_common import RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES
from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
pytestmark = pytest.mark.db_test
@@ -380,7 +380,6 @@ class TestEcsExecutorTask:
class TestAwsEcsExecutor:
"""Tests the AWS ECS Executor."""
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Test requires Airflow
2.10+")
@mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor.change_state")
def test_execute(self, change_state_mock, mock_airflow_key, mock_executor,
mock_cmd):
"""Test execution from end-to-end."""
diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
index bb585951613..ad31a7d9c07 100644
--- a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
+++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
@@ -43,8 +43,6 @@ from airflow.providers.amazon.aws.hooks.s3 import (
)
from airflow.utils.timezone import datetime
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
-
@pytest.fixture
def mocked_s3_res():
@@ -59,19 +57,17 @@ def s3_bucket(mocked_s3_res):
return bucket
-if AIRFLOW_V_2_10_PLUS:
-
- @pytest.fixture
- def hook_lineage_collector():
- from airflow.lineage import hook
- from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
[email protected]
+def hook_lineage_collector():
+ from airflow.lineage import hook
+ from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
- hook._hook_lineage_collector = None
- hook._hook_lineage_collector = hook.HookLineageCollector()
+ hook._hook_lineage_collector = None
+ hook._hook_lineage_collector = hook.HookLineageCollector()
- yield get_hook_lineage_collector()
+ yield get_hook_lineage_collector()
- hook._hook_lineage_collector = None
+ hook._hook_lineage_collector = None
class TestAwsS3Hook:
@@ -448,7 +444,6 @@ class TestAwsS3Hook:
resource = boto3.resource("s3").Object(s3_bucket, "my_key")
assert resource.get()["Body"].read() == b"Cont\xc3\xa9nt"
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
def test_load_string_exposes_lineage(self, s3_bucket,
hook_lineage_collector):
hook = S3Hook()
@@ -1023,7 +1018,6 @@ class TestAwsS3Hook:
resource = boto3.resource("s3").Object(s3_bucket, "my_key")
assert gz.decompress(resource.get()["Body"].read()) == b"Content"
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
def test_load_file_exposes_lineage(self, s3_bucket, tmp_path,
hook_lineage_collector):
hook = S3Hook()
path = tmp_path / "testfile"
@@ -1091,7 +1085,6 @@ class TestAwsS3Hook:
ACL="private",
)
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
@mock_aws
def test_copy_object_ol_instrumentation(self, s3_bucket,
hook_lineage_collector):
mock_hook = S3Hook()
@@ -1230,7 +1223,6 @@ class TestAwsS3Hook:
assert path.name == output_file
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
@mock.patch("airflow.providers.amazon.aws.hooks.s3.NamedTemporaryFile")
def test_download_file_exposes_lineage(self, mock_temp_file, tmp_path,
hook_lineage_collector):
path = tmp_path / "airflow_tmp_test_s3_hook"
@@ -1273,7 +1265,6 @@ class TestAwsS3Hook:
mock_open.assert_called_once_with(path, "wb")
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
@mock.patch("airflow.providers.amazon.aws.hooks.s3.open")
def test_download_file_with_preserve_name_exposes_lineage(
self, mock_open, tmp_path, hook_lineage_collector
diff --git
a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
index 3696f552d76..e2897457c05 100644
--- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
+++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
@@ -44,7 +44,7 @@ from airflow.utils.state import State
from airflow.utils.timezone import datetime
from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
def get_time_str(time_in_milliseconds):
@@ -270,19 +270,13 @@ class TestCloudwatchTaskHandler:
{"timestamp": current_time, "message": "Third"},
],
)
- if AIRFLOW_V_2_10_PLUS:
- monkeypatch.setattr(self.cloudwatch_task_handler,
"_read_from_logs_server", lambda a, b: ([], []))
- msg_template = textwrap.dedent("""
- INFO - ::group::Log message source details
- *** Reading remote log from Cloudwatch log_group: {}
log_stream: {}
- INFO - ::endgroup::
- {}
- """)[1:][:-1] # Strip off leading and trailing new lines, but not
spaces
- else:
- msg_template = textwrap.dedent("""
- *** Reading remote log from Cloudwatch log_group: {}
log_stream: {}
- {}
- """).strip()
+ monkeypatch.setattr(self.cloudwatch_task_handler,
"_read_from_logs_server", lambda a, b: ([], []))
+ msg_template = textwrap.dedent("""
+ INFO - ::group::Log message source details
+ *** Reading remote log from Cloudwatch log_group: {} log_stream: {}
+ INFO - ::endgroup::
+ {}
+ """)[1:][:-1] # Strip off leading and trailing new lines, but not
spaces
logs, metadata = self.cloudwatch_task_handler.read(self.ti)
if AIRFLOW_V_3_0_PLUS:
diff --git
a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_sql.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_sql.py
index 1b124ee2028..bbd4ade48d1 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_sql.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_sql.py
@@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations
-from unittest.mock import MagicMock, PropertyMock, call, patch
+from unittest.mock import MagicMock, call, patch
import pytest
@@ -40,14 +40,13 @@ MOCK_REGION_NAME = "eu-north-1"
class TestRedshiftSQLOpenLineage:
@patch.dict("os.environ",
AIRFLOW_CONN_AWS_DEFAULT=f"aws://?region_name={MOCK_REGION_NAME}")
@pytest.mark.parametrize(
- "connection_host, connection_extra, expected_identity, is_over_210,
expected_schemaname",
+ "connection_host, connection_extra, expected_identity,
expected_schemaname",
[
# test without a connection host but with a cluster_identifier in
connection extra
(
None,
{"iam": True, "cluster_identifier":
"cluster_identifier_from_extra"},
f"cluster_identifier_from_extra.{MOCK_REGION_NAME}",
- True,
"database.public",
),
# test with a connection host and without a cluster_identifier in
connection extra
@@ -55,7 +54,6 @@ class TestRedshiftSQLOpenLineage:
"cluster_identifier_from_host.id.my_region.redshift.amazonaws.com",
{"iam": True},
"cluster_identifier_from_host.my_region",
- True,
"database.public",
),
# test with both connection host and cluster_identifier in
connection extra
@@ -63,41 +61,20 @@ class TestRedshiftSQLOpenLineage:
"cluster_identifier_from_host.x.y",
{"iam": True, "cluster_identifier":
"cluster_identifier_from_extra"},
f"cluster_identifier_from_extra.{MOCK_REGION_NAME}",
- True,
"database.public",
),
- # test when hostname doesn't match pattern
- ("1.2.3.4", {}, "1.2.3.4", True, "database.public"),
- # test with Airflow below 2.10 not using Hook connection
- (
-
"cluster_identifier_from_host.id.my_region.redshift.amazonaws.com",
- {"iam": True},
- "cluster_identifier_from_host.my_region",
- False,
- "public",
- ),
],
)
- @patch(
- "airflow.providers.amazon.aws.hooks.redshift_sql.AIRFLOW_V_2_10_PLUS",
- new_callable=PropertyMock,
- )
- @patch("airflow.providers.openlineage.utils.utils.AIRFLOW_V_2_10_PLUS",
new_callable=PropertyMock)
@patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.conn")
def test_execute_openlineage_events(
self,
mock_aws_hook_conn,
- mock_ol_utils,
- mock_redshift_sql,
connection_host,
connection_extra,
expected_identity,
- is_over_210,
expected_schemaname,
# self, mock_aws_hook_conn, connection_host, connection_extra,
expected_identity, is_below_2_10, expected_schemaname
):
- mock_ol_utils.__bool__ = lambda x: is_over_210
- mock_redshift_sql.__bool__ = lambda x: is_over_210
DB_NAME = "database"
DB_SCHEMA_NAME = "public"
@@ -176,97 +153,93 @@ class TestRedshiftSQLOpenLineage:
dbapi_hook.get_conn.return_value.cursor.return_value.fetchall.side_effect = rows
lineage = op.get_openlineage_facets_on_start()
- if is_over_210:
- assert
dbapi_hook.get_conn.return_value.cursor.return_value.execute.mock_calls == [
- call(
- "SELECT SVV_REDSHIFT_COLUMNS.schema_name, "
- "SVV_REDSHIFT_COLUMNS.table_name, "
- "SVV_REDSHIFT_COLUMNS.column_name, "
- "SVV_REDSHIFT_COLUMNS.ordinal_position, "
- "SVV_REDSHIFT_COLUMNS.data_type, "
- "SVV_REDSHIFT_COLUMNS.database_name \n"
- "FROM SVV_REDSHIFT_COLUMNS \n"
- f"WHERE SVV_REDSHIFT_COLUMNS.schema_name =
'{expected_schemaname}' "
- "AND SVV_REDSHIFT_COLUMNS.table_name IN ('little_table') "
- "OR SVV_REDSHIFT_COLUMNS.database_name = 'another_db' "
- "AND SVV_REDSHIFT_COLUMNS.schema_name = 'another_schema'
AND "
- "SVV_REDSHIFT_COLUMNS.table_name IN
('popular_orders_day_of_week')"
- ),
- call(
- "SELECT SVV_REDSHIFT_COLUMNS.schema_name, "
- "SVV_REDSHIFT_COLUMNS.table_name, "
- "SVV_REDSHIFT_COLUMNS.column_name, "
- "SVV_REDSHIFT_COLUMNS.ordinal_position, "
- "SVV_REDSHIFT_COLUMNS.data_type, "
- "SVV_REDSHIFT_COLUMNS.database_name \n"
- "FROM SVV_REDSHIFT_COLUMNS \n"
- f"WHERE SVV_REDSHIFT_COLUMNS.schema_name =
'{expected_schemaname}' "
- "AND SVV_REDSHIFT_COLUMNS.table_name IN ('Test_table')"
- ),
- ]
- else:
- assert
dbapi_hook.get_conn.return_value.cursor.return_value.execute.mock_calls == []
+ assert
dbapi_hook.get_conn.return_value.cursor.return_value.execute.mock_calls == [
+ call(
+ "SELECT SVV_REDSHIFT_COLUMNS.schema_name, "
+ "SVV_REDSHIFT_COLUMNS.table_name, "
+ "SVV_REDSHIFT_COLUMNS.column_name, "
+ "SVV_REDSHIFT_COLUMNS.ordinal_position, "
+ "SVV_REDSHIFT_COLUMNS.data_type, "
+ "SVV_REDSHIFT_COLUMNS.database_name \n"
+ "FROM SVV_REDSHIFT_COLUMNS \n"
+ f"WHERE SVV_REDSHIFT_COLUMNS.schema_name =
'{expected_schemaname}' "
+ "AND SVV_REDSHIFT_COLUMNS.table_name IN ('little_table') "
+ "OR SVV_REDSHIFT_COLUMNS.database_name = 'another_db' "
+ "AND SVV_REDSHIFT_COLUMNS.schema_name = 'another_schema' AND "
+ "SVV_REDSHIFT_COLUMNS.table_name IN
('popular_orders_day_of_week')"
+ ),
+ call(
+ "SELECT SVV_REDSHIFT_COLUMNS.schema_name, "
+ "SVV_REDSHIFT_COLUMNS.table_name, "
+ "SVV_REDSHIFT_COLUMNS.column_name, "
+ "SVV_REDSHIFT_COLUMNS.ordinal_position, "
+ "SVV_REDSHIFT_COLUMNS.data_type, "
+ "SVV_REDSHIFT_COLUMNS.database_name \n"
+ "FROM SVV_REDSHIFT_COLUMNS \n"
+ f"WHERE SVV_REDSHIFT_COLUMNS.schema_name =
'{expected_schemaname}' "
+ "AND SVV_REDSHIFT_COLUMNS.table_name IN ('Test_table')"
+ ),
+ ]
expected_namespace = f"redshift://{expected_identity}:5439"
- if is_over_210:
- assert lineage.inputs == [
- Dataset(
- namespace=expected_namespace,
-
name=f"{ANOTHER_DB_NAME}.{ANOTHER_DB_SCHEMA}.popular_orders_day_of_week",
- facets={
- "schema": SchemaDatasetFacet(
- fields=[
-
SchemaDatasetFacetFields(name="order_day_of_week", type="varchar"),
-
SchemaDatasetFacetFields(name="order_placed_on", type="timestamp"),
- SchemaDatasetFacetFields(name="orders_placed",
type="int4"),
- ]
- )
- },
- ),
- Dataset(
- namespace=expected_namespace,
- name=f"{DB_NAME}.{DB_SCHEMA_NAME}.little_table",
- facets={
- "schema": SchemaDatasetFacet(
- fields=[
-
SchemaDatasetFacetFields(name="order_day_of_week", type="varchar"),
-
SchemaDatasetFacetFields(name="additional_constant", type="varchar"),
- ]
- )
- },
- ),
- ]
- assert lineage.outputs == [
- Dataset(
- namespace=expected_namespace,
- name=f"{DB_NAME}.{DB_SCHEMA_NAME}.test_table",
- facets={
- "schema": SchemaDatasetFacet(
- fields=[
-
SchemaDatasetFacetFields(name="order_day_of_week", type="varchar"),
-
SchemaDatasetFacetFields(name="order_placed_on", type="timestamp"),
- SchemaDatasetFacetFields(name="orders_placed",
type="int4"),
-
SchemaDatasetFacetFields(name="additional_constant", type="varchar"),
- ]
- ),
- "columnLineage": ColumnLineageDatasetFacet(
- fields={
- "additional_constant": Fields(
- inputFields=[
- InputField(
- namespace=expected_namespace,
-
name="database.public.little_table",
- field="additional_constant",
- )
- ],
- transformationDescription="",
- transformationType="",
- )
- }
- ),
- },
- )
- ]
+ assert lineage.inputs == [
+ Dataset(
+ namespace=expected_namespace,
+
name=f"{ANOTHER_DB_NAME}.{ANOTHER_DB_SCHEMA}.popular_orders_day_of_week",
+ facets={
+ "schema": SchemaDatasetFacet(
+ fields=[
+ SchemaDatasetFacetFields(name="order_day_of_week",
type="varchar"),
+ SchemaDatasetFacetFields(name="order_placed_on",
type="timestamp"),
+ SchemaDatasetFacetFields(name="orders_placed",
type="int4"),
+ ]
+ )
+ },
+ ),
+ Dataset(
+ namespace=expected_namespace,
+ name=f"{DB_NAME}.{DB_SCHEMA_NAME}.little_table",
+ facets={
+ "schema": SchemaDatasetFacet(
+ fields=[
+ SchemaDatasetFacetFields(name="order_day_of_week",
type="varchar"),
+
SchemaDatasetFacetFields(name="additional_constant", type="varchar"),
+ ]
+ )
+ },
+ ),
+ ]
+ assert lineage.outputs == [
+ Dataset(
+ namespace=expected_namespace,
+ name=f"{DB_NAME}.{DB_SCHEMA_NAME}.test_table",
+ facets={
+ "schema": SchemaDatasetFacet(
+ fields=[
+ SchemaDatasetFacetFields(name="order_day_of_week",
type="varchar"),
+ SchemaDatasetFacetFields(name="order_placed_on",
type="timestamp"),
+ SchemaDatasetFacetFields(name="orders_placed",
type="int4"),
+
SchemaDatasetFacetFields(name="additional_constant", type="varchar"),
+ ]
+ ),
+ "columnLineage": ColumnLineageDatasetFacet(
+ fields={
+ "additional_constant": Fields(
+ inputFields=[
+ InputField(
+ namespace=expected_namespace,
+ name="database.public.little_table",
+ field="additional_constant",
+ )
+ ],
+ transformationDescription="",
+ transformationType="",
+ )
+ }
+ ),
+ },
+ )
+ ]
assert lineage.job_facets == {"sql": SQLJobFacet(query=sql)}
diff --git
a/providers/apache/spark/src/airflow/providers/apache/spark/version_compat.py
b/providers/apache/spark/src/airflow/providers/apache/spark/version_compat.py
index 21e7170194e..48d122b6696 100644
---
a/providers/apache/spark/src/airflow/providers/apache/spark/version_compat.py
+++
b/providers/apache/spark/src/airflow/providers/apache/spark/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/celery/tests/unit/celery/cli/test_celery_command.py
b/providers/celery/tests/unit/celery/cli/test_celery_command.py
index b10a7afa126..cae229fd3b5 100644
--- a/providers/celery/tests/unit/celery/cli/test_celery_command.py
+++ b/providers/celery/tests/unit/celery/cli/test_celery_command.py
@@ -31,7 +31,7 @@ from airflow.providers.celery.cli import celery_command
from airflow.providers.celery.cli.celery_command import
_run_stale_bundle_cleanup
from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
pytestmark = pytest.mark.db_test
@@ -313,13 +313,6 @@ class TestFlowerCommand:
stderr="/tmp/flower-stderr.log",
log="/tmp/flower.log",
)
- if AIRFLOW_V_2_10_PLUS
- else mock.call(
- process="flower",
- stdout="/tmp/flower-stdout.log",
- stderr="/tmp/flower-stderr.log",
- log="/tmp/flower.log",
- )
]
mock_pid_file.assert_has_calls([mock.call(mock_setup_locations.return_value[0],
-1)])
assert mock_open.mock_calls == [
diff --git
a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
index 22dbd59a914..974dc2b06f7 100644
--- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
+++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
@@ -44,7 +44,7 @@ from airflow.utils.state import State
from tests_common.test_utils import db
from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
pytestmark = pytest.mark.db_test
@@ -226,8 +226,8 @@ class TestCeleryExecutor:
not_adopted_tis = executor.try_adopt_task_instances(tis)
- key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, None, 0 if
AIRFLOW_V_2_10_PLUS else 1)
- key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, None, 0 if
AIRFLOW_V_2_10_PLUS else 1)
+ key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, None, 0)
+ key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, None, 0)
assert executor.running == {key_1, key_2}
assert executor.tasks == {key_1: AsyncResult("231"), key_2:
AsyncResult("232")}
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
index 21e7170194e..48d122b6696 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git
a/providers/common/compat/src/airflow/providers/common/compat/assets/__init__.py
b/providers/common/compat/src/airflow/providers/common/compat/assets/__init__.py
index 01fb1ab2018..b2b3c3e86e2 100644
---
a/providers/common/compat/src/airflow/providers/common/compat/assets/__init__.py
+++
b/providers/common/compat/src/airflow/providers/common/compat/assets/__init__.py
@@ -20,7 +20,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from airflow.providers.common.compat.version_compat import (
- AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS,
)
@@ -38,16 +37,12 @@ else:
from airflow.auth.managers.models.resource_details import
DatasetDetails as AssetDetails
from airflow.datasets import (
Dataset as Asset,
+ DatasetAlias as AssetAlias,
DatasetAll as AssetAll,
DatasetAny as AssetAny,
+ expand_alias_to_datasets as expand_alias_to_assets,
)
- if AIRFLOW_V_2_10_PLUS:
- from airflow.datasets import (
- DatasetAlias as AssetAlias,
- expand_alias_to_datasets as expand_alias_to_assets,
- )
-
__all__ = [
"Asset",
diff --git
a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
index 1b18723c457..bf7f1705c6d 100644
---
a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
+++
b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
@@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations
-from airflow.providers.common.compat.version_compat import
AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS
+from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
def _get_asset_compat_hook_lineage_collector():
@@ -85,9 +85,7 @@ def get_hook_lineage_collector():
return get_hook_lineage_collector()
- # HookLineageCollector added in 2.10
- if AIRFLOW_V_2_10_PLUS:
- return _get_asset_compat_hook_lineage_collector()
+ return _get_asset_compat_hook_lineage_collector()
# For the case that airflow has not yet upgraded to 2.10 or higher,
# but using the providers that already uses `get_hook_lineage_collector`
diff --git
a/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
b/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
index 0e34419043f..c53e8b53e92 100644
---
a/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
+++
b/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
@@ -19,8 +19,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING
-from airflow.providers.common.compat.version_compat import AIRFLOW_V_2_10_PLUS
-
if TYPE_CHECKING:
from airflow.providers.standard.operators.python import (
_SERIALIZERS,
@@ -38,13 +36,11 @@ else:
)
except ModuleNotFoundError:
from airflow.operators.python import (
+ _SERIALIZERS,
PythonOperator,
ShortCircuitOperator,
get_current_context,
)
- if AIRFLOW_V_2_10_PLUS:
- from airflow.operators.python import _SERIALIZERS
-
__all__ = ["PythonOperator", "_SERIALIZERS", "ShortCircuitOperator",
"get_current_context"]
diff --git
a/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
b/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
index 21e7170194e..48d122b6696 100644
---
a/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
+++
b/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git
a/providers/common/io/src/airflow/providers/common/io/version_compat.py
b/providers/common/io/src/airflow/providers/common/io/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/common/io/src/airflow/providers/common/io/version_compat.py
+++ b/providers/common/io/src/airflow/providers/common/io/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git
a/providers/databricks/src/airflow/providers/databricks/version_compat.py
b/providers/databricks/src/airflow/providers/databricks/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/databricks/src/airflow/providers/databricks/version_compat.py
+++ b/providers/databricks/src/airflow/providers/databricks/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py
index 983ee48e63c..b9e56f30f3f 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py
@@ -22,7 +22,7 @@ import re
from typing import TYPE_CHECKING
from airflow.providers.common.compat.openlineage.check import
require_openlineage_version
-from airflow.providers.dbt.cloud.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from airflow.providers.dbt.cloud.version_compat import AIRFLOW_V_3_0_PLUS
if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
@@ -48,13 +48,6 @@ def _get_logical_date(task_instance):
return date
-def _get_try_number(val):
- # todo: remove when min airflow version >= 2.10.0
- if AIRFLOW_V_2_10_PLUS:
- return val.try_number
- return val.try_number - 1
-
-
@require_openlineage_version(provider_min_version="2.0.0")
def generate_openlineage_events_from_dbt_cloud_run(
operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor, task_instance:
TaskInstance
@@ -144,7 +137,7 @@ def generate_openlineage_events_from_dbt_cloud_run(
dag_id=task_instance.dag_id,
task_id=operator.task_id,
logical_date=_get_logical_date(task_instance),
- try_number=_get_try_number(task_instance),
+ try_number=task_instance.try_number,
map_index=task_instance.map_index,
)
diff --git
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/docker/src/airflow/providers/docker/version_compat.py
b/providers/docker/src/airflow/providers/docker/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/docker/src/airflow/providers/docker/version_compat.py
+++ b/providers/docker/src/airflow/providers/docker/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/edge3/src/airflow/providers/edge3/version_compat.py
b/providers/edge3/src/airflow/providers/edge3/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/edge3/src/airflow/providers/edge3/version_compat.py
+++ b/providers/edge3/src/airflow/providers/edge3/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git
a/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py
b/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py
index 21e7170194e..48d122b6696 100644
---
a/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py
+++
b/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/google/src/airflow/providers/google/version_compat.py
b/providers/google/src/airflow/providers/google/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/google/src/airflow/providers/google/version_compat.py
+++ b/providers/google/src/airflow/providers/google/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
index 45eb7cf2d68..d7febaada86 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
@@ -52,8 +52,6 @@ from airflow.providers.google.cloud.hooks.bigquery import (
split_tablename,
)
-from tests_common.test_utils.compat import AIRFLOW_V_2_10_PLUS
-
pytestmark =
pytest.mark.filterwarnings("error::airflow.exceptions.AirflowProviderDeprecationWarning")
PROJECT_ID = "bq-project"
@@ -1933,7 +1931,6 @@ class TestBigQueryAsyncHookMethods:
assert result == [{"f0_": 22, "f1_": 3.14, "f2_": "PI"}]
[email protected](not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
@pytest.mark.db_test
class TestHookLevelLineage(_BigQueryBaseTestClass):
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.Client")
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_gcs.py
b/providers/google/tests/unit/google/cloud/hooks/test_gcs.py
index 5156ca6d395..326209ee1d0 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_gcs.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_gcs.py
@@ -43,7 +43,6 @@ from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.utils import timezone
from airflow.version import version
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
from unit.google.cloud.utils.base_gcp_mock import
mock_base_gcp_hook_default_project_id
BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
@@ -411,7 +410,6 @@ class TestGCSHook:
assert str(ctx.value) == "source_bucket and source_object cannot be
empty."
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
@mock.patch("google.cloud.storage.Bucket.copy_blob")
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_copy_exposes_lineage(self, mock_service, mock_copy,
hook_lineage_collector):
@@ -507,7 +505,6 @@ class TestGCSHook:
assert str(ctx.value) == "source_bucket and source_object cannot be
empty."
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_rewrite_exposes_lineage(self, mock_service,
hook_lineage_collector):
source_bucket_name = "test-source-bucket"
@@ -567,7 +564,6 @@ class TestGCSHook:
with pytest.raises(exceptions.NotFound):
self.gcs_hook.delete(bucket_name=test_bucket,
object_name=test_object)
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_delete_exposes_lineage(self, mock_service,
hook_lineage_collector):
test_bucket = "test_bucket"
@@ -818,7 +814,6 @@ class TestGCSHook:
assert str(ctx.value) == "bucket_name and destination_object cannot be
empty."
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_compose_exposes_lineage(self, mock_service,
hook_lineage_collector):
test_bucket = "test_bucket"
@@ -859,7 +854,6 @@ class TestGCSHook:
assert response == test_object_bytes
download_method.assert_called_once_with()
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
@mock.patch("google.cloud.storage.Blob.download_as_bytes")
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_download_as_bytes_exposes_lineage(self, mock_service,
mock_download, hook_lineage_collector):
@@ -899,7 +893,6 @@ class TestGCSHook:
assert response == test_file
download_filename_method.assert_called_once_with(test_file, timeout=60)
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
@mock.patch("google.cloud.storage.Blob.download_to_filename")
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_download_to_file_exposes_lineage(self, mock_service,
mock_download, hook_lineage_collector):
@@ -1154,7 +1147,6 @@ class TestGCSHookUpload:
assert metadata == blob_object.return_value.metadata
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
@mock.patch("google.cloud.storage.Blob.upload_from_filename")
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_upload_file_exposes_lineage(self, mock_service, mock_upload,
hook_lineage_collector):
@@ -1218,7 +1210,6 @@ class TestGCSHookUpload:
upload_method.assert_called_once_with(testdata_bytes,
content_type="text/plain", timeout=60)
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
@mock.patch("google.cloud.storage.Blob.upload_from_string")
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_upload_data_exposes_lineage(self, mock_service, mock_upload,
hook_lineage_collector):
diff --git
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py
index 21e7170194e..48d122b6696 100644
---
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py
+++
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
index e33c71f39f1..6ae597ba0e3 100644
---
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
+++
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
@@ -33,7 +33,6 @@ from airflow.utils import timezone
from tests_common.test_utils.file_loading import load_file_from_resources,
load_json_from_resources
from tests_common.test_utils.mock_context import mock_context
from tests_common.test_utils.operators.run_deferrable import execute_operator
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
from unit.microsoft.azure.base import Base
from unit.microsoft.azure.test_utils import mock_json_response, mock_response
@@ -261,7 +260,6 @@ class TestMSGraphAsyncOperator(Base):
assert events[0].payload["response"] == base64_encoded_content
@pytest.mark.db_test
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Lambda parameters
works in Airflow >= 2.10.0")
def test_execute_with_lambda_parameter_when_response_is_bytes(self):
content = load_file_from_resources(
dirname(__file__), "..", "resources", "dummy.pdf", mode="rb",
encoding=None
diff --git
a/providers/microsoft/azure/tests/unit/microsoft/azure/sensors/test_msgraph.py
b/providers/microsoft/azure/tests/unit/microsoft/azure/sensors/test_msgraph.py
index c73c76caffe..055165d4fc7 100644
---
a/providers/microsoft/azure/tests/unit/microsoft/azure/sensors/test_msgraph.py
+++
b/providers/microsoft/azure/tests/unit/microsoft/azure/sensors/test_msgraph.py
@@ -28,7 +28,6 @@ from airflow.triggers.base import TriggerEvent
from tests_common.test_utils.file_loading import load_json_from_resources
from tests_common.test_utils.operators.run_deferrable import execute_operator
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
from unit.microsoft.azure.base import Base
from unit.microsoft.azure.test_utils import mock_json_response
@@ -102,7 +101,6 @@ class TestMSGraphSensor(Base):
assert events[2].payload["type"] == "builtins.dict"
assert events[2].payload["response"] == json.dumps(status[1])
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Lambda parameters
works in Airflow >= 2.10.0")
def
test_execute_with_lambda_parameter_and_result_processor_with_new_signature(self):
status = load_json_from_resources(dirname(__file__), "..",
"resources", "status.json")
response = mock_json_response(200, *status)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
index 545ab651ad3..11228c0d10f 100644
---
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
+++
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
@@ -33,7 +33,6 @@ from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors import ExtractorManager,
OperatorLineage
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter,
RunState
from airflow.providers.openlineage.utils.utils import (
- AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS,
get_airflow_dag_run_facet,
get_airflow_debug_facet,
@@ -59,13 +58,6 @@ if TYPE_CHECKING:
_openlineage_listener: OpenLineageListener | None = None
-def _get_try_number_success(val):
- # todo: remove when min airflow version >= 2.10.0
- if AIRFLOW_V_2_10_PLUS:
- return val.try_number
- return val.try_number - 1
-
-
def _executor_initializer():
"""
Initialize processes for the executor used with DAGRun listener's methods
(on scheduler).
@@ -304,7 +296,7 @@ class OpenLineageListener:
task_uuid = self.adapter.build_task_instance_run_id(
dag_id=dag.dag_id,
task_id=task.task_id,
- try_number=_get_try_number_success(task_instance),
+ try_number=task_instance.try_number,
logical_date=date,
map_index=task_instance.map_index,
)
@@ -366,8 +358,7 @@ class OpenLineageListener:
dagrun = context["dag_run"]
dag = context["dag"]
self._on_task_instance_failed(task_instance, dag, dagrun, task,
error)
-
- elif AIRFLOW_V_2_10_PLUS:
+ else:
@hookimpl
def on_task_instance_failed(
@@ -382,19 +373,6 @@ class OpenLineageListener:
if TYPE_CHECKING:
assert task
self._on_task_instance_failed(task_instance, task.dag,
task_instance.dag_run, task, error)
- else:
-
- @hookimpl
- def on_task_instance_failed(
- self,
- previous_state: TaskInstanceState,
- task_instance: TaskInstance,
- session: Session, # type: ignore[valid-type]
- ) -> None:
- task = task_instance.task
- if TYPE_CHECKING:
- assert task
- self._on_task_instance_failed(task_instance, task.dag,
task_instance.dag_run, task)
def _on_task_instance_failed(
self,
@@ -651,10 +629,7 @@ class OpenLineageListener:
self.log.debug("Executor have not started before
`on_dag_run_success`")
return
- if AIRFLOW_V_2_10_PLUS:
- task_ids = DagRun._get_partial_task_ids(dag_run.dag)
- else:
- task_ids = dag_run.dag.task_ids if dag_run.dag and
dag_run.dag.partial else None
+ task_ids = DagRun._get_partial_task_ids(dag_run.dag)
date = dag_run.logical_date
if AIRFLOW_V_3_0_PLUS and date is None:
@@ -690,10 +665,7 @@ class OpenLineageListener:
self.log.debug("Executor have not started before
`on_dag_run_failed`")
return
- if AIRFLOW_V_2_10_PLUS:
- task_ids = DagRun._get_partial_task_ids(dag_run.dag)
- else:
- task_ids = dag_run.dag.task_ids if dag_run.dag and
dag_run.dag.partial else None
+ task_ids = DagRun._get_partial_task_ids(dag_run.dag)
date = dag_run.logical_date
if AIRFLOW_V_3_0_PLUS and date is None:
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
index 36c26657546..b1f465a9409 100644
---
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
+++
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
@@ -25,7 +25,6 @@ from airflow.providers.openlineage.plugins.macros import (
lineage_parent_id,
lineage_run_id,
)
-from airflow.providers.openlineage.version_compat import AIRFLOW_V_2_10_PLUS
class OpenLineageProviderPlugin(AirflowPlugin):
@@ -40,10 +39,9 @@ class OpenLineageProviderPlugin(AirflowPlugin):
if not conf.is_disabled():
macros = [lineage_job_namespace, lineage_job_name, lineage_run_id,
lineage_parent_id]
listeners = [get_openlineage_listener()]
- if AIRFLOW_V_2_10_PLUS:
- from airflow.lineage.hook import HookLineageReader
+ from airflow.lineage.hook import HookLineageReader
- hook_lineage_readers = [HookLineageReader]
+ hook_lineage_readers = [HookLineageReader]
else:
macros = []
listeners = []
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index 454b164d0f9..2567cb41802 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -50,7 +50,7 @@ from airflow.providers.openlineage.utils.selective_enable
import (
is_dag_lineage_enabled,
is_task_lineage_enabled,
)
-from airflow.providers.openlineage.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from airflow.providers.openlineage.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.sensors.base import BaseSensorOperator
from airflow.serialization.serialized_objects import SerializedBaseOperator
from airflow.utils.module_loading import import_string
@@ -803,12 +803,6 @@ def get_filtered_unknown_operator_keys(operator:
BaseOperator) -> dict:
def should_use_external_connection(hook) -> bool:
# If we're at Airflow 2.10, the execution is process-isolated, so we can
safely run those again.
- if not AIRFLOW_V_2_10_PLUS:
- return hook.__class__.__name__ not in [
- "SnowflakeHook",
- "SnowflakeSqlApiHook",
- "RedshiftSQLHook",
- ]
return True
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git
a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
index 73f2c0d7b3d..55e11588343 100644
--- a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
+++ b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
@@ -38,7 +38,7 @@ from airflow.utils.state import State
from tests_common.test_utils.compat import DateTimeSensor, PythonOperator
from tests_common.test_utils.markers import
skip_if_force_lowest_dependencies_marker
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
if TYPE_CHECKING:
try:
@@ -53,21 +53,19 @@ if TYPE_CHECKING:
AssetEventDagRunReference = TIRunContext = Any # type: ignore[misc,
assignment]
-if AIRFLOW_V_2_10_PLUS:
-
- @pytest.fixture
- def hook_lineage_collector():
- from airflow.lineage import hook
- from airflow.providers.common.compat.lineage.hook import (
- get_hook_lineage_collector,
- )
[email protected]
+def hook_lineage_collector():
+ from airflow.lineage import hook
+ from airflow.providers.common.compat.lineage.hook import (
+ get_hook_lineage_collector,
+ )
- hook._hook_lineage_collector = None
- hook._hook_lineage_collector = hook.HookLineageCollector()
+ hook._hook_lineage_collector = None
+ hook._hook_lineage_collector = hook.HookLineageCollector()
- yield get_hook_lineage_collector()
+ yield get_hook_lineage_collector()
- hook._hook_lineage_collector = None
+ hook._hook_lineage_collector = None
if AIRFLOW_V_3_0_PLUS:
@@ -281,7 +279,6 @@ def test_convert_to_ol_dataset_table():
@skip_if_force_lowest_dependencies_marker
[email protected](not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
def test_extractor_manager_uses_hook_level_lineage(hook_lineage_collector):
dagrun = MagicMock()
task = MagicMock()
@@ -300,7 +297,6 @@ def
test_extractor_manager_uses_hook_level_lineage(hook_lineage_collector):
assert metadata.outputs == [OpenLineageDataset(namespace="s3://bucket",
name="output_key")]
[email protected](not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in
Airflow >= 2.10.0")
def test_extractor_manager_does_not_use_hook_level_lineage_when_operator(
hook_lineage_collector,
):
@@ -330,8 +326,8 @@ def
test_extractor_manager_does_not_use_hook_level_lineage_when_operator(
@pytest.mark.db_test
@pytest.mark.skipif(
- not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS,
- reason="Test for hook level lineage in Airflow >= 2.10.0 < 3.0",
+ AIRFLOW_V_3_0_PLUS,
+ reason="Test for hook level lineage in Airflow < 3.0",
)
def test_extractor_manager_gets_data_from_pythonoperator(session, dag_maker,
hook_lineage_collector):
path = None
diff --git
a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
index 9f12994e728..88de93e6e28 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
@@ -37,7 +37,7 @@ from airflow.utils.types import DagRunType
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_runs
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
if AIRFLOW_V_3_0_PLUS:
from airflow.utils.types import DagRunTriggeredByType
@@ -68,9 +68,7 @@ def has_value_in_events(events, chain, value):
with tempfile.TemporaryDirectory(prefix="venv") as tmp_dir:
listener_path = Path(tmp_dir) / "event"
- @pytest.mark.skipif(
- not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS, reason="Test requires
Airflow>=2.10<3.0"
- )
+ @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow<3.0")
@pytest.mark.usefixtures("reset_logging_config")
class TestOpenLineageExecution:
def teardown_method(self):
diff --git
a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
index 7ece6038c11..3dc2c938835 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
@@ -44,18 +44,17 @@ from airflow.utils.state import DagRunState, State
from tests_common.test_utils.compat import EmptyOperator, PythonOperator
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_runs
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
pytestmark = pytest.mark.db_test
-EXPECTED_TRY_NUMBER_1 = 1 if AIRFLOW_V_2_10_PLUS else 0
-
-TRY_NUMBER_BEFORE_EXECUTION = 0 if AIRFLOW_V_2_10_PLUS else 1
-TRY_NUMBER_RUNNING = 0 if AIRFLOW_V_2_10_PLUS else 1
-TRY_NUMBER_FAILED = 0 if AIRFLOW_V_2_10_PLUS else 1
-TRY_NUMBER_SUCCESS = 0 if AIRFLOW_V_2_10_PLUS else 2
-TRY_NUMBER_AFTER_EXECUTION = 0 if AIRFLOW_V_2_10_PLUS else 2
+EXPECTED_TRY_NUMBER_1 = 1
+TRY_NUMBER_BEFORE_EXECUTION = 0
+TRY_NUMBER_RUNNING = 0
+TRY_NUMBER_FAILED = 0
+TRY_NUMBER_SUCCESS = 0
+TRY_NUMBER_AFTER_EXECUTION = 0
if TYPE_CHECKING:
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
@@ -356,8 +355,8 @@ class TestOpenLineageListenerAirflow2:
mock_disabled.return_value = False
err = ValueError("test")
- on_task_failed_listener_kwargs = {"error": err} if AIRFLOW_V_2_10_PLUS
else {}
- expected_err_kwargs = {"error": err if AIRFLOW_V_2_10_PLUS else None}
+ on_task_failed_listener_kwargs = {"error": err}
+ expected_err_kwargs = {"error": err}
listener.on_task_instance_failed(
previous_state=None, task_instance=task_instance,
**on_task_failed_listener_kwargs, session=None
@@ -460,7 +459,7 @@ class TestOpenLineageListenerAirflow2:
parameters derived from the task instance.
"""
listener, task_instance = self._create_listener_and_task_instance()
- on_task_failed_kwargs = {"error": ValueError("test")} if
AIRFLOW_V_2_10_PLUS else {}
+ on_task_failed_kwargs = {"error": ValueError("test")}
listener.on_task_instance_failed(
previous_state=None, task_instance=task_instance,
**on_task_failed_kwargs, session=None
@@ -582,7 +581,7 @@ class TestOpenLineageListenerAirflow2:
mock_get_user_provided_run_facets.return_value = {"custom_facet": 2}
mock_disabled.return_value = True
- on_task_failed_kwargs = {"error": ValueError("test")} if
AIRFLOW_V_2_10_PLUS else {}
+ on_task_failed_kwargs = {"error": ValueError("test")}
listener.on_task_instance_failed(
previous_state=None, task_instance=task_instance,
**on_task_failed_kwargs, session=None
@@ -1011,8 +1010,8 @@ class TestOpenLineageListenerAirflow3:
mock_disabled.return_value = False
err = ValueError("test")
- on_task_failed_listener_kwargs = {"error": err} if AIRFLOW_V_2_10_PLUS
else {}
- expected_err_kwargs = {"error": err if AIRFLOW_V_2_10_PLUS else None}
+ on_task_failed_listener_kwargs = {"error": err}
+ expected_err_kwargs = {"error": err}
listener.on_task_instance_failed(
previous_state=None, task_instance=task_instance,
**on_task_failed_listener_kwargs
@@ -1054,8 +1053,8 @@ class TestOpenLineageListenerAirflow3:
mock_get_job_name.return_value = "job_name"
err = ValueError("test")
- on_task_failed_listener_kwargs = {"error": err} if AIRFLOW_V_2_10_PLUS
else {}
- expected_err_kwargs = {"error": err if AIRFLOW_V_2_10_PLUS else None}
+ on_task_failed_listener_kwargs = {"error": err}
+ expected_err_kwargs = {"error": err}
listener.on_task_instance_failed(
previous_state=None, task_instance=task_instance,
**on_task_failed_listener_kwargs
@@ -1185,7 +1184,7 @@ class TestOpenLineageListenerAirflow3:
parameters derived from the task instance.
"""
listener, task_instance = self._create_listener_and_task_instance()
- on_task_failed_kwargs = {"error": ValueError("test")} if
AIRFLOW_V_2_10_PLUS else {}
+ on_task_failed_kwargs = {"error": ValueError("test")}
listener.on_task_instance_failed(
previous_state=None, task_instance=task_instance,
**on_task_failed_kwargs
@@ -1248,7 +1247,7 @@ class TestOpenLineageListenerAirflow3:
mock_get_user_provided_run_facets.return_value = {"custom_facet": 2}
mock_disabled.return_value = True
- on_task_failed_kwargs = {"error": ValueError("test")} if
AIRFLOW_V_2_10_PLUS else {}
+ on_task_failed_kwargs = {"error": ValueError("test")}
listener.on_task_instance_failed(
previous_state=None, task_instance=task_instance,
**on_task_failed_kwargs
@@ -1427,7 +1426,7 @@ class TestOpenLineageSelectiveEnableAirflow2:
if enable_task:
enable_lineage(self.task_1)
- on_task_failed_kwargs = {"error": ValueError("test")} if
AIRFLOW_V_2_10_PLUS else {}
+ on_task_failed_kwargs = {"error": ValueError("test")}
with conf_vars({("openlineage", "selective_enable"):
selective_enable}):
listener = OpenLineageListener()
@@ -1485,7 +1484,7 @@ class TestOpenLineageSelectiveEnableAirflow2:
if enable_task:
enable_lineage(self.task_1)
- on_task_failed_kwargs = {"error": ValueError("test")} if
AIRFLOW_V_2_10_PLUS else {}
+ on_task_failed_kwargs = {"error": ValueError("test")}
with conf_vars({("openlineage", "selective_enable"):
selective_enable}):
listener = OpenLineageListener()
diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py
b/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py
index 4d7944a0ad6..ffbb10c8c48 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py
@@ -51,7 +51,7 @@ from airflow.utils.types import DagRunType
from tests_common.test_utils.compat import (
BashOperator,
)
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
if AIRFLOW_V_3_0_PLUS:
from airflow.utils.types import DagRunTriggeredByType
@@ -562,7 +562,7 @@ def
test_serialize_timetable_with_dataset_or_time_schedule():
@pytest.mark.skipif(
- not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS,
+ AIRFLOW_V_3_0_PLUS,
reason="This test checks serialization only in 2.10 conditions",
)
def test_serialize_timetable_2_10_complex_with_alias():
@@ -600,7 +600,7 @@ def test_serialize_timetable_2_10_complex_with_alias():
@pytest.mark.skipif(
- not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS,
+ AIRFLOW_V_3_0_PLUS,
reason="This test checks serialization only in 2.10 conditions",
)
def test_serialize_timetable_2_10_single_asset():
@@ -612,7 +612,7 @@ def test_serialize_timetable_2_10_single_asset():
@pytest.mark.skipif(
- not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS,
+ AIRFLOW_V_3_0_PLUS,
reason="This test checks serialization only in 2.10 conditions",
)
def test_serialize_timetable_2_10_list_of_assets():
@@ -630,7 +630,7 @@ def test_serialize_timetable_2_10_list_of_assets():
@pytest.mark.skipif(
- not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS,
+ AIRFLOW_V_3_0_PLUS,
reason="This test checks serialization only in 2.10 conditions",
)
def test_serialize_timetable_2_10_with_complex_logical_condition():
@@ -665,7 +665,7 @@ def
test_serialize_timetable_2_10_with_complex_logical_condition():
@pytest.mark.skipif(
- not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS,
+ AIRFLOW_V_3_0_PLUS,
reason="This test checks serialization only in 2.10 conditions",
)
def test_serialize_timetable_2_10_with_dataset_or_time_schedule():
@@ -709,102 +709,6 @@ def
test_serialize_timetable_2_10_with_dataset_or_time_schedule():
}
[email protected](AIRFLOW_V_2_10_PLUS, reason="This test checks
serialization only in 2.9 conditions")
-def test_serialize_timetable_2_9_single_asset():
- dag = DAG(dag_id="test", start_date=datetime.datetime(2025, 1, 1),
schedule=Asset("a"))
- dag_info = DagInfo(dag)
- assert dag_info.timetable == {"dataset_condition": {"__type": "dataset",
"uri": "a", "extra": None}}
-
-
[email protected](AIRFLOW_V_2_10_PLUS, reason="This test checks
serialization only in 2.9 conditions")
-def test_serialize_timetable_2_9_list_of_assets():
- dag = DAG(dag_id="test", start_date=datetime.datetime(2025, 1, 1),
schedule=[Asset("a"), Asset("b")])
- dag_info = DagInfo(dag)
- assert dag_info.timetable == {
- "dataset_condition": {
- "__type": "dataset_all",
- "objects": [
- {"__type": "dataset", "extra": None, "uri": "a"},
- {"__type": "dataset", "extra": None, "uri": "b"},
- ],
- }
- }
-
-
[email protected](AIRFLOW_V_2_10_PLUS, reason="This test checks
serialization only in 2.9 conditions")
-def test_serialize_timetable_2_9_with_complex_logical_condition():
- dag = DAG(
- dag_id="test",
- start_date=datetime.datetime(2025, 1, 1),
- schedule=(Asset("ds1", extra={"some_extra": 1}) | Asset("ds2"))
- & (Asset("ds3") | Asset("ds4", extra={"another_extra": 345})),
- )
- dag_info = DagInfo(dag)
- assert dag_info.timetable == {
- "dataset_condition": {
- "__type": "dataset_all",
- "objects": [
- {
- "__type": "dataset_any",
- "objects": [
- {"__type": "dataset", "uri": "ds1", "extra":
{"some_extra": 1}},
- {"__type": "dataset", "uri": "ds2", "extra": None},
- ],
- },
- {
- "__type": "dataset_any",
- "objects": [
- {"__type": "dataset", "uri": "ds3", "extra": None},
- {"__type": "dataset", "uri": "ds4", "extra":
{"another_extra": 345}},
- ],
- },
- ],
- }
- }
-
-
[email protected](AIRFLOW_V_2_10_PLUS, reason="This test checks
serialization only in 2.9 conditions")
-def test_serialize_timetable_2_9_with_dataset_or_time_schedule():
- from airflow.timetables.datasets import DatasetOrTimeSchedule
- from airflow.timetables.trigger import CronTriggerTimetable
-
- dag = DAG(
- dag_id="test",
- start_date=datetime.datetime(2025, 1, 1),
- schedule=DatasetOrTimeSchedule(
- timetable=CronTriggerTimetable("0 0 * 3 *", timezone="UTC"),
- datasets=(Asset("ds1", extra={"some_extra": 1}) | Asset("ds2"))
- & (Asset("ds3") | Asset("ds4", extra={"another_extra": 345})),
- ),
- )
- dag_info = DagInfo(dag)
- assert dag_info.timetable == {
- "timetable": {
- "__type": "airflow.timetables.trigger.CronTriggerTimetable",
- "__var": {"expression": "0 0 * 3 *", "timezone": "UTC",
"interval": 0.0},
- },
- "dataset_condition": {
- "__type": "dataset_all",
- "objects": [
- {
- "__type": "dataset_any",
- "objects": [
- {"__type": "dataset", "uri": "ds1", "extra":
{"some_extra": 1}},
- {"__type": "dataset", "uri": "ds2", "extra": None},
- ],
- },
- {
- "__type": "dataset_any",
- "objects": [
- {"__type": "dataset", "uri": "ds3", "extra": None},
- {"__type": "dataset", "uri": "ds4", "extra":
{"another_extra": 345}},
- ],
- },
- ],
- },
- }
-
-
@pytest.mark.parametrize(
("airflow_version", "ol_version"),
[
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index 07b1a90ff9a..27b64daa630 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -64,7 +64,7 @@ from airflow.utils.types import DagRunType
from tests_common.test_utils.compat import BashOperator, PythonOperator
from tests_common.test_utils.mock_operators import MockOperator
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python"
@@ -1026,7 +1026,7 @@ class TestDagInfoAirflow2:
}
[email protected](not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS,
reason="Airflow 2.10 tests")
[email protected](AIRFLOW_V_3_0_PLUS, reason="Airflow < 3.0 tests")
class TestDagInfoAirflow210:
def test_dag_info_schedule_single_dataset_directly(self):
dag = DAG(
diff --git
a/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
b/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
+++ b/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/presto/src/airflow/providers/presto/version_compat.py
b/providers/presto/src/airflow/providers/presto/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/presto/src/airflow/providers/presto/version_compat.py
+++ b/providers/presto/src/airflow/providers/presto/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/redis/src/airflow/providers/redis/version_compat.py
b/providers/redis/src/airflow/providers/redis/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/redis/src/airflow/providers/redis/version_compat.py
+++ b/providers/redis/src/airflow/providers/redis/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/sftp/src/airflow/providers/sftp/version_compat.py
b/providers/sftp/src/airflow/providers/sftp/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/sftp/src/airflow/providers/sftp/version_compat.py
+++ b/providers/sftp/src/airflow/providers/sftp/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/smtp/tests/unit/smtp/notifications/test_smtp.py
b/providers/smtp/tests/unit/smtp/notifications/test_smtp.py
index bacabf87b4d..ff3c02c4da9 100644
--- a/providers/smtp/tests/unit/smtp/notifications/test_smtp.py
+++ b/providers/smtp/tests/unit/smtp/notifications/test_smtp.py
@@ -30,14 +30,12 @@ from airflow.providers.smtp.notifications.smtp import (
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.utils import timezone
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
-
pytestmark = pytest.mark.db_test
SMTP_API_DEFAULT_CONN_ID = SmtpHook.default_conn_name
-NUM_TRY = 0 if AIRFLOW_V_2_10_PLUS else 1
+NUM_TRY = 0
class TestSmtpNotifier:
diff --git
a/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
b/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
index 1a3d3f01836..14d682c50cb 100644
--- a/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
+++ b/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
@@ -23,9 +23,8 @@ from typing import TYPE_CHECKING
from urllib.parse import quote, urlparse, urlunparse
from airflow.providers.common.compat.openlineage.check import
require_openlineage_version
-from airflow.providers.snowflake.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from airflow.providers.snowflake.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.utils import timezone
-from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
from openlineage.client.event_v2 import RunEvent
@@ -122,21 +121,12 @@ def _get_ol_run_id(task_instance) -> str:
return date
- def _get_try_number_success():
- """We are running this in the _on_complete, so need to adjust for
try_num changes."""
- # todo: remove when min airflow version >= 2.10.0
- if AIRFLOW_V_2_10_PLUS:
- return task_instance.try_number
- if task_instance.state == TaskInstanceState.SUCCESS:
- return task_instance.try_number - 1
- return task_instance.try_number
-
# Generate same OL run id as is generated for current task instance
return OpenLineageAdapter.build_task_instance_run_id(
dag_id=task_instance.dag_id,
task_id=task_instance.task_id,
logical_date=_get_logical_date(),
- try_number=_get_try_number_success(),
+ try_number=task_instance.try_number,
map_index=task_instance.map_index,
)
diff --git
a/providers/snowflake/src/airflow/providers/snowflake/version_compat.py
b/providers/snowflake/src/airflow/providers/snowflake/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/snowflake/src/airflow/providers/snowflake/version_compat.py
+++ b/providers/snowflake/src/airflow/providers/snowflake/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py
b/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py
index 1ecaf75af18..a6c94a7a383 100644
--- a/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py
+++ b/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py
@@ -44,8 +44,6 @@ from airflow.providers.snowflake.utils.openlineage import (
from airflow.utils import timezone
from airflow.utils.state import TaskInstanceState
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
-
@pytest.mark.parametrize(
"source,target",
@@ -118,7 +116,7 @@ def test_get_ol_run_id_ti_success():
dag_id="dag_id",
task_id="task_id",
map_index=1,
- try_number=1 if AIRFLOW_V_2_10_PLUS else 2,
+ try_number=1,
logical_date=logical_date,
state=TaskInstanceState.SUCCESS,
)
@@ -150,7 +148,7 @@ def test_get_parent_run_facet():
dag_id="dag_id",
task_id="task_id",
map_index=1,
- try_number=1 if AIRFLOW_V_2_10_PLUS else 2,
+ try_number=1,
logical_date=logical_date,
state=TaskInstanceState.SUCCESS,
)
@@ -553,7 +551,7 @@ def
test_emit_openlineage_events_for_snowflake_queries_without_hook(mock_now, mo
dag_id="dag_id",
task_id="task_id",
map_index=1,
- try_number=1 if AIRFLOW_V_2_10_PLUS else 2,
+ try_number=1,
logical_date=logical_date,
state=TaskInstanceState.SUCCESS, # This will be query default state
if no metadata found
)
diff --git
a/providers/standard/src/airflow/providers/standard/operators/python.py
b/providers/standard/src/airflow/providers/standard/operators/python.py
index 1f6759ef8b8..2f0d288258f 100644
--- a/providers/standard/src/airflow/providers/standard/operators/python.py
+++ b/providers/standard/src/airflow/providers/standard/operators/python.py
@@ -44,12 +44,12 @@ from airflow.exceptions import (
from airflow.models.baseoperator import BaseOperator
from airflow.models.variable import Variable
from airflow.providers.standard.utils.python_virtualenv import
prepare_virtualenv, write_python_script
-from airflow.providers.standard.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.utils import hashlib_wrapper
from airflow.utils.context import context_copy_partial, context_merge
from airflow.utils.file import get_unique_dag_module_name
from airflow.utils.operator_helpers import KeywordParameters
-from airflow.utils.process_utils import execute_in_subprocess,
execute_in_subprocess_with_kwargs
+from airflow.utils.process_utils import execute_in_subprocess
if AIRFLOW_V_3_0_PLUS:
from airflow.providers.standard.operators.branch import BranchMixIn
@@ -200,12 +200,10 @@ class PythonOperator(BaseOperator):
from airflow.sdk.execution_time.context import
context_get_outlet_events
return create_executable_runner,
context_get_outlet_events(context)
- if AIRFLOW_V_2_10_PLUS:
- from airflow.utils.context import context_get_outlet_events #
type: ignore
- from airflow.utils.operator_helpers import
ExecutionCallableRunner # type: ignore
+ from airflow.utils.context import context_get_outlet_events #
type: ignore
+ from airflow.utils.operator_helpers import ExecutionCallableRunner
# type: ignore
- return ExecutionCallableRunner,
context_get_outlet_events(context)
- return None
+ return ExecutionCallableRunner, context_get_outlet_events(context)
self.__prepare_execution = __prepare_execution
@@ -560,16 +558,10 @@ class _BasePythonVirtualenvOperator(PythonOperator,
metaclass=ABCMeta):
os.fspath(termination_log_path),
os.fspath(airflow_context_path),
]
- if AIRFLOW_V_2_10_PLUS:
- execute_in_subprocess(
- cmd=cmd,
- env=env_vars,
- )
- else:
- execute_in_subprocess_with_kwargs(
- cmd=cmd,
- env=env_vars,
- )
+ execute_in_subprocess(
+ cmd=cmd,
+ env=env_vars,
+ )
except subprocess.CalledProcessError as e:
if e.returncode in self.skip_on_exit_code:
raise AirflowSkipException(f"Process exited with code
{e.returncode}. Skipping.")
diff --git a/providers/standard/src/airflow/providers/standard/sensors/time.py
b/providers/standard/src/airflow/providers/standard/sensors/time.py
index ee9dde773fe..bbe93dbc01a 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/time.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/time.py
@@ -22,7 +22,6 @@ from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, NoReturn
from airflow.providers.standard.triggers.temporal import DateTimeTrigger
-from airflow.providers.standard.version_compat import AIRFLOW_V_2_10_PLUS
from airflow.sensors.base import BaseSensorOperator
try:
@@ -123,9 +122,7 @@ class TimeSensorAsync(BaseSensorOperator):
def execute(self, context: Context) -> NoReturn:
self.defer(
- trigger=DateTimeTrigger(moment=self.target_datetime,
end_from_trigger=self.end_from_trigger)
- if AIRFLOW_V_2_10_PLUS
- else DateTimeTrigger(moment=self.target_datetime),
+ trigger=DateTimeTrigger(moment=self.target_datetime,
end_from_trigger=self.end_from_trigger),
method_name="execute_complete",
)
diff --git
a/providers/standard/src/airflow/providers/standard/triggers/temporal.py
b/providers/standard/src/airflow/providers/standard/triggers/temporal.py
index 48ebb223cb6..5f7451a0e84 100644
--- a/providers/standard/src/airflow/providers/standard/triggers/temporal.py
+++ b/providers/standard/src/airflow/providers/standard/triggers/temporal.py
@@ -23,14 +23,9 @@ from typing import Any
import pendulum
-from airflow.exceptions import AirflowException
-from airflow.providers.standard.version_compat import AIRFLOW_V_2_10_PLUS
-from airflow.triggers.base import BaseTrigger, TriggerEvent
+from airflow.triggers.base import BaseTrigger, TaskSuccessEvent, TriggerEvent
from airflow.utils import timezone
-if AIRFLOW_V_2_10_PLUS:
- from airflow.triggers.base import TaskSuccessEvent
-
class DateTimeTrigger(BaseTrigger):
"""
@@ -54,9 +49,6 @@ class DateTimeTrigger(BaseTrigger):
if moment.tzinfo is None:
raise ValueError("You cannot pass naive datetimes")
self.moment: pendulum.DateTime = timezone.convert_to_utc(moment)
- if not AIRFLOW_V_2_10_PLUS and end_from_trigger:
- raise AirflowException("end_from_trigger is only supported in
Airflow 2.10 and later. ")
-
self.end_from_trigger = end_from_trigger
def serialize(self) -> tuple[str, dict[str, Any]]:
diff --git
a/providers/standard/src/airflow/providers/standard/version_compat.py
b/providers/standard/src/airflow/providers/standard/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/standard/src/airflow/providers/standard/version_compat.py
+++ b/providers/standard/src/airflow/providers/standard/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git
a/providers/standard/tests/unit/standard/decorators/test_external_python.py
b/providers/standard/tests/unit/standard/decorators/test_external_python.py
index ef2007048f5..1b4bba68c24 100644
--- a/providers/standard/tests/unit/standard/decorators/test_external_python.py
+++ b/providers/standard/tests/unit/standard/decorators/test_external_python.py
@@ -29,8 +29,6 @@ import pytest
from airflow.decorators import setup, task, teardown
from airflow.utils import timezone
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
-
pytestmark = pytest.mark.db_test
@@ -68,7 +66,6 @@ def venv_python_with_cloudpickle_and_dill(tmp_path_factory):
class TestExternalPythonDecorator:
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support
came in after 2.10")
@pytest.mark.parametrize(
"serializer",
[
@@ -89,7 +86,6 @@ class TestExternalPythonDecorator:
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support
came in after 2.10")
@pytest.mark.parametrize(
"serializer",
[
@@ -115,7 +111,6 @@ class TestExternalPythonDecorator:
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support
came in after 2.10")
@pytest.mark.parametrize(
"serializer",
[
@@ -147,7 +142,6 @@ class TestExternalPythonDecorator:
with pytest.raises(CalledProcessError):
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support
came in after 2.10")
@pytest.mark.parametrize(
"serializer",
[
@@ -170,7 +164,6 @@ class TestExternalPythonDecorator:
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support
came in after 2.10")
@pytest.mark.parametrize(
"serializer",
[
@@ -191,7 +184,6 @@ class TestExternalPythonDecorator:
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support
came in after 2.10")
@pytest.mark.parametrize(
"serializer",
[
@@ -212,7 +204,6 @@ class TestExternalPythonDecorator:
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support
came in after 2.10")
@pytest.mark.parametrize(
"serializer",
[
@@ -239,7 +230,6 @@ class TestExternalPythonDecorator:
assert setup_task.is_setup
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support
came in after 2.10")
@pytest.mark.parametrize(
"serializer",
[
@@ -266,7 +256,6 @@ class TestExternalPythonDecorator:
assert teardown_task.is_teardown
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support
came in after 2.10")
@pytest.mark.parametrize(
"serializer",
[
diff --git a/providers/standard/tests/unit/standard/operators/test_python.py
b/providers/standard/tests/unit/standard/operators/test_python.py
index 4206db5481b..45a1e395e1a 100644
--- a/providers/standard/tests/unit/standard/operators/test_python.py
+++ b/providers/standard/tests/unit/standard/operators/test_python.py
@@ -70,7 +70,7 @@ from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import NOTSET, DagRunType
from tests_common.test_utils.db import clear_db_runs
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
if TYPE_CHECKING:
from airflow.models.dagrun import DagRun
@@ -936,11 +936,10 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
"conn", # Accessor for Connection.
"map_index_template",
}
- if AIRFLOW_V_2_10_PLUS:
- intentionally_excluded_context_keys |= {
- "inlet_events",
- "outlet_events",
- }
+ intentionally_excluded_context_keys |= {
+ "inlet_events",
+ "outlet_events",
+ }
ti = create_task_instance(dag_id=self.dag_id, task_id=self.task_id,
schedule=None)
context = ti.get_template_context()
diff --git a/providers/standard/tests/unit/standard/triggers/test_temporal.py
b/providers/standard/tests/unit/standard/triggers/test_temporal.py
index 91e27298c50..fc85eab8273 100644
--- a/providers/standard/tests/unit/standard/triggers/test_temporal.py
+++ b/providers/standard/tests/unit/standard/triggers/test_temporal.py
@@ -29,8 +29,6 @@ from airflow.utils import timezone
from airflow.utils.state import TaskInstanceState
from airflow.utils.timezone import utcnow
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
-
def test_input_validation():
"""
@@ -76,7 +74,6 @@ def test_timedelta_trigger_serialization():
assert -2 < (kwargs["moment"] - expected_moment).total_seconds() < 2
[email protected](not AIRFLOW_V_2_10_PLUS, reason="Only for Airflow 2.10+")
@pytest.mark.parametrize(
"tz, end_from_trigger",
[
@@ -116,44 +113,6 @@ async def
test_datetime_trigger_timing_airflow_2_10_plus(tz, end_from_trigger):
assert result.payload == expected_payload
[email protected](AIRFLOW_V_2_10_PLUS, reason="Only for Airflow < 2.10+")
[email protected](
- "tz",
- [
- timezone.parse_timezone("UTC"),
- timezone.parse_timezone("Europe/Paris"),
- timezone.parse_timezone("America/Toronto"),
- ],
-)
[email protected]
-async def test_datetime_trigger_timing(tz):
- """
- Tests that the DateTimeTrigger only goes off on or after the appropriate
- time.
- """
- past_moment = pendulum.instance((timezone.utcnow() -
datetime.timedelta(seconds=60)).astimezone(tz))
- future_moment = pendulum.instance((timezone.utcnow() +
datetime.timedelta(seconds=60)).astimezone(tz))
-
- # Create a task that runs the trigger for a short time then cancels it
- trigger = DateTimeTrigger(future_moment)
- trigger_task = asyncio.create_task(trigger.run().__anext__())
- await asyncio.sleep(0.5)
-
- # It should not have produced a result
- assert trigger_task.done() is False
- trigger_task.cancel()
-
- # Now, make one waiting for en event in the past and do it again
- trigger = DateTimeTrigger(past_moment)
- trigger_task = asyncio.create_task(trigger.run().__anext__())
- await asyncio.sleep(0.5)
-
- assert trigger_task.done() is True
- result = trigger_task.result()
- assert isinstance(result, TriggerEvent)
- assert result.payload == past_moment
-
-
@mock.patch("airflow.providers.standard.triggers.temporal.timezone.utcnow")
@mock.patch("airflow.providers.standard.triggers.temporal.asyncio.sleep")
@pytest.mark.asyncio
diff --git a/providers/trino/src/airflow/providers/trino/version_compat.py
b/providers/trino/src/airflow/providers/trino/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/trino/src/airflow/providers/trino/version_compat.py
+++ b/providers/trino/src/airflow/providers/trino/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
return airflow_version.major, airflow_version.minor, airflow_version.micro
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)