This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d23fe09ab1d [Edge]Add child processes to separate process group than
main (#43927)
d23fe09ab1d is described below
commit d23fe09ab1d870ec6024c537b0c53588df6df80a
Author: majorosdonat <[email protected]>
AuthorDate: Tue Nov 12 15:46:49 2024 +0100
[Edge]Add child processes to separate process group than main (#43927)
* Block digint for child processes
* try start_new_session
* Add shutdown handling for SIGTERM
* Update version
* Update providers/src/airflow/providers/edge/CHANGELOG.rst
* Delete line from docs
---------
Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <[email protected]>
Co-authored-by: Jens Scheffler <[email protected]>
---
docs/apache-airflow-providers-edge/edge_executor.rst | 1 -
providers/src/airflow/providers/edge/CHANGELOG.rst | 10 ++++++++++
providers/src/airflow/providers/edge/__init__.py | 2 +-
providers/src/airflow/providers/edge/cli/edge_command.py | 9 ++++++++-
providers/src/airflow/providers/edge/provider.yaml | 2 +-
5 files changed, 20 insertions(+), 4 deletions(-)
diff --git a/docs/apache-airflow-providers-edge/edge_executor.rst
b/docs/apache-airflow-providers-edge/edge_executor.rst
index d27cb5bc698..a4e49d650d3 100644
--- a/docs/apache-airflow-providers-edge/edge_executor.rst
+++ b/docs/apache-airflow-providers-edge/edge_executor.rst
@@ -232,7 +232,6 @@ The following features are known missing and will be
implemented in increments:
- Edge Worker CLI
- Use WebSockets instead of HTTP calls for communication
- - Handle SIG-INT/CTRL+C and gracefully terminate and complete job (``airflow
edge stop`` is working though)
- Send logs also to TaskFileHandler if external logging services are used
- Integration into telemetry to send metrics from remote site
- Allow ``airflow edge stop`` to wait until completed to terminated
diff --git a/providers/src/airflow/providers/edge/CHANGELOG.rst
b/providers/src/airflow/providers/edge/CHANGELOG.rst
index ae77d32b7ee..6f2fda5db9a 100644
--- a/providers/src/airflow/providers/edge/CHANGELOG.rst
+++ b/providers/src/airflow/providers/edge/CHANGELOG.rst
@@ -27,6 +27,16 @@
Changelog
---------
+0.5.4pre0
+.........
+
+Misc
+~~~~
+
+* ``Fix SIGINT handling of child processes. Ensure graceful shutdown when
SIGINT in received (not killing working tasks).``
+* ``Fix SIGTERM handling of child processes. Ensure all childs are terminated
on SIGTERM.``
+
+
0.5.3pre0
.........
diff --git a/providers/src/airflow/providers/edge/__init__.py
b/providers/src/airflow/providers/edge/__init__.py
index 2998b9d6a90..996e57a229c 100644
--- a/providers/src/airflow/providers/edge/__init__.py
+++ b/providers/src/airflow/providers/edge/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
__all__ = ["__version__"]
-__version__ = "0.5.3pre0"
+__version__ = "0.5.4pre0"
if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
"2.10.0"
diff --git a/providers/src/airflow/providers/edge/cli/edge_command.py
b/providers/src/airflow/providers/edge/cli/edge_command.py
index 69f12acc3fa..be9a739dc29 100644
--- a/providers/src/airflow/providers/edge/cli/edge_command.py
+++ b/providers/src/airflow/providers/edge/cli/edge_command.py
@@ -159,6 +159,12 @@ class _EdgeWorkerCli:
logger.info("Request to show down Edge Worker received, waiting for
jobs to complete.")
_EdgeWorkerCli.drain = True
+ def shutdown_handler(self, sig, frame):
+ logger.info("SIGTERM received. Terminating all jobs and quit")
+ for job in self.jobs:
+ os.killpg(job.process.pid, signal.SIGTERM)
+ _EdgeWorkerCli.drain = True
+
def _get_sysinfo(self) -> dict:
"""Produce the sysinfo from worker to post to central site."""
return {
@@ -182,6 +188,7 @@ class _EdgeWorkerCli:
raise SystemExit(str(e))
_write_pid_to_pidfile(self.pid_file_path)
signal.signal(signal.SIGINT, _EdgeWorkerCli.signal_handler)
+ signal.signal(signal.SIGTERM, self.shutdown_handler)
try:
while not _EdgeWorkerCli.drain or self.jobs:
self.loop()
@@ -218,7 +225,7 @@ class _EdgeWorkerCli:
env["AIRFLOW__CORE__DATABASE_ACCESS_ISOLATION"] = "True"
env["AIRFLOW__CORE__INTERNAL_API_URL"] = conf.get("edge",
"api_url")
env["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"] = "1"
- process = Popen(edge_job.command, close_fds=True, env=env)
+ process = Popen(edge_job.command, close_fds=True, env=env,
start_new_session=True)
logfile = EdgeLogs.logfile_path(edge_job.key)
self.jobs.append(_Job(edge_job, process, logfile, 0))
EdgeJob.set_state(edge_job.key, TaskInstanceState.RUNNING)
diff --git a/providers/src/airflow/providers/edge/provider.yaml
b/providers/src/airflow/providers/edge/provider.yaml
index 34e787c528d..fd0e55fd6f6 100644
--- a/providers/src/airflow/providers/edge/provider.yaml
+++ b/providers/src/airflow/providers/edge/provider.yaml
@@ -27,7 +27,7 @@ source-date-epoch: 1729683247
# note that those versions are maintained by release manager - do not update
them manually
versions:
- - 0.5.3pre0
+ - 0.5.4pre0
dependencies:
- apache-airflow>=2.10.0