This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch genproviders in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 6a730ba000b3d3eb225959f6b7ad6adf903bfacc Author: Maciej Obuchowski <[email protected]> AuthorDate: Tue Oct 15 15:25:42 2024 +0200 generate version of providers compatible with 2.7 Signed-off-by: Maciej Obuchowski <[email protected]> --- airflow/providers/common/compat/__init__.py | 4 +- airflow/providers/common/compat/assets/__init__.py | 6 ++- airflow/providers/common/compat/provider.yaml | 2 +- airflow/providers/openlineage/__init__.py | 4 +- .../providers/openlineage/extractors/manager.py | 2 + airflow/providers/openlineage/provider.yaml | 2 +- airflow/providers/openlineage/utils/utils.py | 45 +++++++++++++++++++-- ..._providers_common_compat-1.2.1-py3-none-any.whl | Bin 0 -> 18789 bytes ...w_providers_openlineage-1.12.2-py3-none-any.whl | Bin 0 -> 59601 bytes .../prepare_providers/provider_packages.py | 1 + dev/breeze/src/airflow_breeze/utils/packages.py | 2 +- generated/provider_dependencies.json | 4 +- 12 files changed, 57 insertions(+), 15 deletions(-) diff --git a/airflow/providers/common/compat/__init__.py b/airflow/providers/common/compat/__init__.py index ef51cb422e5..7d80601c060 100644 --- a/airflow/providers/common/compat/__init__.py +++ b/airflow/providers/common/compat/__init__.py @@ -32,8 +32,8 @@ __all__ = ["__version__"] __version__ = "1.2.1" if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( - "2.8.0" + "2.7.0" ): raise RuntimeError( - f"The package `apache-airflow-providers-common-compat:{__version__}` needs Apache Airflow 2.8.0+" + f"The package `apache-airflow-providers-common-compat:{__version__}` needs Apache Airflow 2.7.0+" ) diff --git a/airflow/providers/common/compat/assets/__init__.py b/airflow/providers/common/compat/assets/__init__.py index 460204a4e41..3547dd99213 100644 --- a/airflow/providers/common/compat/assets/__init__.py +++ b/airflow/providers/common/compat/assets/__init__.py @@ -47,11 +47,13 @@ else: _IS_AIRFLOW_2_10_OR_HIGHER = Version(Version(AIRFLOW_VERSION).base_version) >= Version("2.10.0") _IS_AIRFLOW_2_9_OR_HIGHER = Version(Version(AIRFLOW_VERSION).base_version) >= Version("2.9.0") - + _IS_AIRFLOW_2_8_OR_HIGHER = Version(Version(AIRFLOW_VERSION).base_version) >= Version("2.8.0") # dataset is renamed to asset since Airflow 3.0 - from airflow.auth.managers.models.resource_details import DatasetDetails as AssetDetails from airflow.datasets import Dataset as Asset + if _IS_AIRFLOW_2_8_OR_HIGHER: + from airflow.auth.managers.models.resource_details import DatasetDetails as AssetDetails + if _IS_AIRFLOW_2_9_OR_HIGHER: from airflow.datasets import ( DatasetAll as AssetAll, diff --git a/airflow/providers/common/compat/provider.yaml b/airflow/providers/common/compat/provider.yaml index 3618ecfe643..4f6befcdf77 100644 --- a/airflow/providers/common/compat/provider.yaml +++ b/airflow/providers/common/compat/provider.yaml @@ -31,7 +31,7 @@ versions: - 1.0.0 dependencies: - - apache-airflow>=2.8.0 + - apache-airflow>=2.7.0 integrations: - integration-name: Common Compat diff --git a/airflow/providers/openlineage/__init__.py b/airflow/providers/openlineage/__init__.py index 676fae32d25..3b904adb168 100644 --- a/airflow/providers/openlineage/__init__.py +++ b/airflow/providers/openlineage/__init__.py @@ -32,8 +32,8 @@ __all__ = ["__version__"] __version__ = "1.12.2" if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( - "2.8.0" + "2.7.0" ): raise RuntimeError( - f"The package `apache-airflow-providers-openlineage:{__version__}` needs Apache Airflow 2.8.0+" + f"The package `apache-airflow-providers-openlineage:{__version__}` needs Apache Airflow 2.7.0+" ) diff --git a/airflow/providers/openlineage/extractors/manager.py b/airflow/providers/openlineage/extractors/manager.py index c72c989d893..583254c1fd8 100644 --- a/airflow/providers/openlineage/extractors/manager.py +++ b/airflow/providers/openlineage/extractors/manager.py @@ -191,6 +191,8 @@ class ExtractorManager(LoggingMixin): except ImportError: return None + if not hasattr(get_hook_lineage_collector(), "has_collected"): + return None if not get_hook_lineage_collector().has_collected: return None diff --git a/airflow/providers/openlineage/provider.yaml b/airflow/providers/openlineage/provider.yaml index fadaf9c1e07..bd6397cece3 100644 --- a/airflow/providers/openlineage/provider.yaml +++ b/airflow/providers/openlineage/provider.yaml @@ -49,7 +49,7 @@ versions: - 1.0.0 dependencies: - - apache-airflow>=2.8.0 + - apache-airflow>=2.7.0 - apache-airflow-providers-common-sql>=1.6.0 - apache-airflow-providers-common-compat>=1.2.0 - attrs>=22.2 diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index ca57755d692..a7adb67a5e4 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -257,9 +257,32 @@ class DagInfo(InfoJsonEncodable): "start_date", "tags", ] - casts = {"timetable": lambda dag: dag.timetable.serialize() if getattr(dag, "timetable", None) else None} + casts = {"timetable": lambda dag: DagInfo.serialize_timetable(dag)} renames = {"_dag_id": "dag_id"} + @classmethod + def serialize_timetable(cls, dag): + serialized = dag.timetable.serialize() + if serialized != {} and serialized is not None: + return serialized + try: + from airflow.serialization.serialized_objects import encode_dataset_condition + + return {"dataset_condition": encode_dataset_condition(dag.timetable.dataset_condition)} + except ImportError: + if hasattr(dag, "dataset_triggers"): + triggers = dag.dataset_triggers + return { + "dataset_condition": { + "__type": "dataset_all", + "objects": [ + {"__type": "dataset", "uri": trigger.uri, "extra": trigger.extra} + for trigger in triggers + ], + } + } + return {} + class DagRunInfo(InfoJsonEncodable): """Defines encoding DagRun object to JSON.""" @@ -445,7 +468,20 @@ def get_airflow_job_facet(dag_run: DagRun) -> dict[str, AirflowJobFacet]: def get_airflow_state_run_facet( dag_id: str, run_id: str, task_ids: list[str], dag_run_state: DagRunState ) -> dict[str, AirflowStateRunFacet]: - tis = DagRun.fetch_task_instances(dag_id=dag_id, run_id=run_id, task_ids=task_ids) + try: + tis = DagRun.fetch_task_instances(dag_id=dag_id, run_id=run_id, task_ids=task_ids) + except AttributeError: + try: + dr = DagRun(dag_id=dag_id, run_id=run_id) + tis = dr.get_task_instances() + except: # noqa: E722 + log.debug( + "Failed to fetch task instances for %s %s %s - Airflow version too old.", + dag_id, + run_id, + task_ids, + ) + return {} return { "airflowState": AirflowStateRunFacet( dagRunState=dag_run_state, @@ -609,10 +645,11 @@ def print_warning(log): try: return f(*args, **kwargs) except Exception as e: + print("?") log.warning( - "Note: exception below is being caught: it's printed for visibility. However OpenLineage events aren't being emitted. If you see that, task has completed successfully despite not getting OL events." + "Note: exception below is being caught: it's printed for visibility. However OpenLineage events aren't being emitted. If you see that, task has completed successfully despite not getting OL events.", + exc_info=e, ) - log.warning(e) return wrapper diff --git a/apache_airflow_providers_common_compat-1.2.1-py3-none-any.whl b/apache_airflow_providers_common_compat-1.2.1-py3-none-any.whl new file mode 100644 index 00000000000..ada225e1d55 Binary files /dev/null and b/apache_airflow_providers_common_compat-1.2.1-py3-none-any.whl differ diff --git a/apache_airflow_providers_openlineage-1.12.2-py3-none-any.whl b/apache_airflow_providers_openlineage-1.12.2-py3-none-any.whl new file mode 100644 index 00000000000..d9646b10d49 Binary files /dev/null and b/apache_airflow_providers_openlineage-1.12.2-py3-none-any.whl differ diff --git a/dev/breeze/src/airflow_breeze/prepare_providers/provider_packages.py b/dev/breeze/src/airflow_breeze/prepare_providers/provider_packages.py index 88ad3e8c8cf..5c86120b560 100644 --- a/dev/breeze/src/airflow_breeze/prepare_providers/provider_packages.py +++ b/dev/breeze/src/airflow_breeze/prepare_providers/provider_packages.py @@ -162,6 +162,7 @@ def should_skip_the_package(provider_id: str, version_suffix: str) -> tuple[bool and skip the released if it was. This allows to skip packages that have not been marked for release in this wave. For "dev" suffixes, we always build all packages. """ + return False, version_suffix if version_suffix != "" and not version_suffix.startswith("rc"): return False, version_suffix if version_suffix == "": diff --git a/dev/breeze/src/airflow_breeze/utils/packages.py b/dev/breeze/src/airflow_breeze/utils/packages.py index 5c22c41e0ba..2db95fcb7e6 100644 --- a/dev/breeze/src/airflow_breeze/utils/packages.py +++ b/dev/breeze/src/airflow_breeze/utils/packages.py @@ -49,7 +49,7 @@ from airflow_breeze.utils.publish_docs_helpers import ( from airflow_breeze.utils.run_utils import run_command from airflow_breeze.utils.versions import get_version_tag, strip_leading_zeros_from_version -MIN_AIRFLOW_VERSION = "2.8.0" +MIN_AIRFLOW_VERSION = "2.7.0" HTTPS_REMOTE = "apache-https-for-providers" LONG_PROVIDERS_PREFIX = "apache-airflow-providers-" diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 2d0ab90a350..43349959407 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -393,7 +393,7 @@ }, "common.compat": { "deps": [ - "apache-airflow>=2.8.0" + "apache-airflow>=2.7.0" ], "devel-deps": [], "plugins": [], @@ -966,7 +966,7 @@ "deps": [ "apache-airflow-providers-common-compat>=1.2.0", "apache-airflow-providers-common-sql>=1.6.0", - "apache-airflow>=2.8.0", + "apache-airflow>=2.7.0", "attrs>=22.2", "openlineage-integration-common>=1.22.0", "openlineage-python>=1.22.0"
