This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4be8e4db3c9 Make edge executor db access multi instance save (#44716)
4be8e4db3c9 is described below
commit 4be8e4db3c96e8ad3d51222e1a046c08513ec8bb
Author: AutomationDev85 <[email protected]>
AuthorDate: Fri Dec 6 09:49:49 2024 +0100
Make edge executor db access multi instance save (#44716)
Co-authored-by: Marco Küttelwesch <[email protected]>
---
providers/src/airflow/providers/edge/CHANGELOG.rst | 8 ++++++++
providers/src/airflow/providers/edge/__init__.py | 2 +-
providers/src/airflow/providers/edge/executors/edge_executor.py | 3 +++
providers/src/airflow/providers/edge/provider.yaml | 2 +-
4 files changed, 13 insertions(+), 2 deletions(-)
diff --git a/providers/src/airflow/providers/edge/CHANGELOG.rst
b/providers/src/airflow/providers/edge/CHANGELOG.rst
index ecf411a1d50..13a6b88c750 100644
--- a/providers/src/airflow/providers/edge/CHANGELOG.rst
+++ b/providers/src/airflow/providers/edge/CHANGELOG.rst
@@ -27,6 +27,14 @@
Changelog
---------
+0.9.1pre0
+.........
+
+Misc
+~~~~
+
+* ``Make edge executor DB access is multi instance save.``
+
0.9.0pre0
.........
diff --git a/providers/src/airflow/providers/edge/__init__.py
b/providers/src/airflow/providers/edge/__init__.py
index 5c207bef66a..066508a61c7 100644
--- a/providers/src/airflow/providers/edge/__init__.py
+++ b/providers/src/airflow/providers/edge/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
__all__ = ["__version__"]
-__version__ = "0.9.0pre0"
+__version__ = "0.9.1pre0"
if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
"2.10.0"
diff --git a/providers/src/airflow/providers/edge/executors/edge_executor.py
b/providers/src/airflow/providers/edge/executors/edge_executor.py
index 4184a8ffe5b..f08f0a4fb45 100644
--- a/providers/src/airflow/providers/edge/executors/edge_executor.py
+++ b/providers/src/airflow/providers/edge/executors/edge_executor.py
@@ -135,6 +135,7 @@ class EdgeExecutor(BaseExecutor):
heartbeat_interval: int = conf.getint("edge", "heartbeat_interval")
lifeless_workers: list[EdgeWorkerModel] = (
session.query(EdgeWorkerModel)
+ .with_for_update(skip_locked=True)
.filter(
EdgeWorkerModel.state.not_in([EdgeWorkerState.UNKNOWN,
EdgeWorkerState.OFFLINE]),
EdgeWorkerModel.last_update < (timezone.utcnow() -
timedelta(seconds=heartbeat_interval * 5)),
@@ -154,6 +155,7 @@ class EdgeExecutor(BaseExecutor):
heartbeat_interval: int = conf.getint("scheduler",
"scheduler_zombie_task_threshold")
lifeless_jobs: list[EdgeJobModel] = (
session.query(EdgeJobModel)
+ .with_for_update(skip_locked=True)
.filter(
EdgeJobModel.state == TaskInstanceState.RUNNING,
EdgeJobModel.last_update < (timezone.utcnow() -
timedelta(seconds=heartbeat_interval)),
@@ -180,6 +182,7 @@ class EdgeExecutor(BaseExecutor):
job_fail_purge = conf.getint("edge", "job_fail_purge")
jobs: list[EdgeJobModel] = (
session.query(EdgeJobModel)
+ .with_for_update(skip_locked=True)
.filter(
EdgeJobModel.state.in_(
[
diff --git a/providers/src/airflow/providers/edge/provider.yaml
b/providers/src/airflow/providers/edge/provider.yaml
index 1377279ab76..845bfa225ad 100644
--- a/providers/src/airflow/providers/edge/provider.yaml
+++ b/providers/src/airflow/providers/edge/provider.yaml
@@ -27,7 +27,7 @@ source-date-epoch: 1729683247
# note that those versions are maintained by release manager - do not update
them manually
versions:
- - 0.9.0pre0
+ - 0.9.1pre0
dependencies:
- apache-airflow>=2.10.0